Select Git revision
experiment.py
experiment.py 22.15 KiB
import tensorflow as tf
from keras_applications.vgg16 import VGG16
from keras_applications.resnet50 import ResNet50
from regression_models import *
from RST import *
from extract_features import *
from loss import *
from processingImages import *
from data import *
import numpy as np
import os
def eval(gallery,probe, imgs_per_subj=4, num_subjs=30, num_test_imgs=120):
""" Evaluate using Cosine Similarity Confusion matrix """
# cosine similarity
probe = probe / np.expand_dims(np.sqrt(np.sum(probe ** 2, axis=1)), axis=1)
gallery = gallery / np.expand_dims(np.sqrt(np.sum(gallery ** 2, axis=1)), axis=1)
C = np.matmul(gallery, probe.T)
C = np.reshape(C.T, (-1, imgs_per_subj))
C = np.amax(C, axis=1).reshape(num_subjs, num_test_imgs)
#save C as numpy file using protocol and splits
#np.save(outFile, C)
pred = np.argmax(C, axis=0).reshape(num_test_imgs, 1)
gt = np.tile(np.expand_dims(np.arange(num_subjs), axis=1), (int(num_test_imgs/num_subjs), 1))
acc = np.sum(np.array(pred == gt, dtype='float')) / float(num_test_imgs)
return acc, C
def getModel(model_type, layer_name, pooling):
inter_model = []
if model_type == 'vgg16':
model = VGG16(weights='imagenet', include_top=False, pooling=pooling,
backend=tf.keras.backend,
layers=tf.keras.layers,
models=tf.keras.models,
utils=tf.keras.utils)
model.trainable = False
#model.summary()
elif model_type == 'resnet50':
model = ResNet50(weights='imagenet', include_top=False, pooling=pooling,
backend=tf.keras.backend,
layers=tf.keras.layers,
models=tf.keras.models,
utils=tf.keras.utils)
model.trainable = False
#model.summary()
if layer_name is not None:
inter_model = tf.keras.Model(inputs=model.input, outputs=model.get_layer(layer_name).output)
return model, inter_model
def run_experiment(imgroot, model_type, layer_name=None, pooling=None, pca_flag=False, dogfilter=True, crop=[39, 123, 239, 323], DPM=False, protocol=None, Proposed=False, loss_flag=False, train_flag = False):
acc = [] # Train accuracy
acc_ = [] #Test accuracy
count = 0
# Get VGG16 or Resnet50 model
model, inter_model = getModel(model_type, layer_name, pooling)
for iter in range(5):
#Get protocol files
imgroot, protofile, protofile_, galleryProtofile_, probesProtofile_, label = get_protocols(iter, protocol)
#Get training filenames
if "protocol1" in protocol or "protocol2" in protocol:
filenames = getfilenames(protofile)
visfiles = np.array(filenames)[:, 0].tolist()
thmfiles = np.array(filenames)[:, 1].tolist()
visfiles = [os.path.join(imgroot, 'Registeredvisible', v) for v in visfiles]
thmfiles = [os.path.join(imgroot, 'Registeredthermal', v) for v in thmfiles]
labels_tr = read_integers(label)
elif "protocol3" in protocol:
visfiles, thmfiles, labels_tr = subsample_p3_train(protofile, label)
labels_tr = np.asarray([int(x) for x in labels_tr])
visfiles = [os.path.join(imgroot, 'Registeredvisible', v) for v in visfiles]
thmfiles = [os.path.join(imgroot, 'Registeredthermal', v) for v in thmfiles]
# Load training set
vi = load_images(visfiles, dogfilter=dogfilter, crop=crop, test=True)
s0 = load_images(thmfiles, dogfilter=dogfilter, crop=crop, test=True)
ratio = vi.shape[0] / (s0.shape[0] + vi.shape[0])
N = len(labels_tr)
# Get testing filenames
if protocol == 'protocol1_r1_b' or protocol == 'protocol2_r1_b':
visfiles_, thmfiles_ = protocol1_range1_baseline(imgroot, protofile_)
elif protocol == 'protocol1_r1_e' or protocol == 'protocol2_r1_e':
visfiles_, thmfiles_ = protocol1_range1_expr(imgroot, protofile_)
elif protocol == 'protocol1_r2_b' or protocol == 'protocol2_r2_b':
visfiles_, thmfiles_ = protocol1_range2_baseline(imgroot, protofile_)
elif protocol == 'protocol1_r2_e' or protocol == 'protocol2_r2_e':
visfiles_, thmfiles_ = protocol1_range2_expr(imgroot, protofile_)
elif protocol == 'protocol1_r3_b' or protocol == 'protocol2_r3_b':
visfiles_, thmfiles_ = protocol1_range3_baseline(imgroot, protofile_)
elif protocol == 'protocol1_r3_e' or protocol == 'protocol2_r3_e':
visfiles_, thmfiles_ = protocol1_range3_expr(imgroot, protofile_)
elif 'protocol3' in protocol:
visfiles_, thmfiles_ = protocol3(imgroot, galleryProtofile_, probesProtofile_)
# Load testing set
vi_ = load_images(visfiles_, dogfilter=dogfilter, crop=crop, test=True)
s0_ = load_images(thmfiles_, dogfilter=dogfilter, crop=crop, test=True)
ratio_ = vi_.shape[0] / (s0_.shape[0] + vi_.shape[0])
if model_type == 'dsift':
feats_size = 24
vif, vik = extract_dsift_feature(vi)
s0f, s0k = extract_dsift_feature(s0)
vif_, vik_ = extract_dsift_feature(vi_)
s0f_, s0k_ = extract_dsift_feature(s0_)
elif model_type == 'patch':
vif, vik = extract_dsift_feature(vi)
s0f, s0k = extract_dsift_feature(s0)
vif_, vik_ = extract_dsift_feature(vi_)
s0f_, s0k_ = extract_dsift_feature(s0_)
elif model_type == 'vgg16':
feats_size = 25
if layer_name is None:
vif, vik = extract_vgg16_feature(vi, model)
s0f, s0k = extract_vgg16_feature(s0, model)
vif_, vik_ = extract_vgg16_feature(vi_, model)
s0f_, s0k_ = extract_vgg16_feature(s0_, model)
else:
vif, vik = extract_vgg16_feature(vi, inter_model)
s0f, s0k = extract_vgg16_feature(s0, inter_model)
vif_, vik_ = extract_vgg16_feature(vi_, inter_model)
s0f_, s0k_ = extract_vgg16_feature(s0_, inter_model)
elif model_type == 'resnet50':
feats_size = 25
if layer_name is None:
vif, vik = extract_resnet50_feature(vi, model)
s0f, s0k = extract_resnet50_feature(s0, model)
vif_, vik_ = extract_resnet50_feature(vi_, model)
s0f_, s0k_ = extract_resnet50_feature(s0_, model)
else:
vif, vik = extract_resnet50_feature(vi, inter_model)
s0f, s0k = extract_resnet50_feature(s0, inter_model)
vif_, vik_ = extract_resnet50_feature(vi_, inter_model)
s0f_, s0k_ = extract_resnet50_feature(s0_, inter_model)
# pca
if pca_flag is True:
shape = vif.shape
# reshape training features
vif = vif.reshape(-1, shape[-1])
s0f = s0f.reshape(-1, shape[-1])
feats = np.concatenate((vif, s0f), axis=0)
feats, trans, scaler = pca_reduce(feats, n_components=64, trans=None, scaler=None)
M = int(feats.shape[0] * ratio)
vif = feats[:M, :]
s0f = feats[M:, :]
shape = vif_.shape
# reshape testing features
vif_ = vif_.reshape(-1, shape[-1])
s0f_ = s0f_.reshape(-1, shape[-1])
feats_ = np.concatenate((vif_, s0f_), axis=0)
feats_, trans_, scaler_ = pca_reduce(feats_, n_components=64, trans=trans, scaler=scaler)
M = int(feats_.shape[0] * ratio_)
vif_ = feats_[:M, :]
s0f_ = feats_[M:, :]
# Deep Perceptural Mapping
if DPM is True:
#Run DPM on training data
shape = vif.shape
vif = vif.reshape(-1, shape[-1])
s0f = s0f.reshape(-1, shape[-1])
# reshape keypts
if vik is not None:
if s0k is not None:
vik = vik.reshape(-1, 2)
s0k = s0k.reshape(-1, 2)
# concat keypts
vif = np.concatenate((vif, vik), axis=1)
s0f = np.concatenate((s0f, s0k), axis=1)
# min-max normalization
vmin = np.expand_dims(np.amin(vif, axis=0), axis=1).T
vmax = np.expand_dims(np.amax(vif, axis=0), axis=1).T
smin = np.expand_dims(np.amin(s0f, axis=0), axis=1).T
smax = np.expand_dims(np.amax(s0f, axis=0), axis=1).T
vif = (vif - vmin) / (vmax - vmin) * 2.0 - 1.0
s0f = (s0f - smin) / (smax - smin) * 2.0 - 1.0
# train the dpm network
nn = train_dpm(s0f, vif)
# map features
tmp = np.zeros_like(s0f)
nn = nnff(nn, s0f, tmp)
s0f = nn['a'][-1]
#Run DPM on testing data
shape = vif_.shape
vif_ = vif_.reshape(-1, shape[-1])
s0f_ = s0f_.reshape(-1, shape[-1])
# reshape keypts
if vik_ is not None:
if s0k_ is not None:
vik_ = vik_.reshape(-1, 2)
s0k_ = s0k_.reshape(-1, 2)
# concat keypts
vif_ = np.concatenate((vif_, vik_), axis=1)
s0f_ = np.concatenate((s0f_, s0k_), axis=1)
# min-max normalize
vmin = np.expand_dims(np.amin(vif_, axis=0), axis=1).T
vmax = np.expand_dims(np.amax(vif_, axis=0), axis=1).T
smin = np.expand_dims(np.amin(s0f_, axis=0), axis=1).T
smax = np.expand_dims(np.amax(s0f_, axis=0), axis=1).T
vif_ = (vif_ - vmin) / (vmax - vmin) * 2.0 - 1.0
s0f_ = (s0f_ - smin) / (smax - smin) * 2.0 - 1.0
#map features
tmp = np.zeros_like(s0f_)
nn = nnff(nn, s0f_, tmp)
s0f_ = nn['a'][-1]
vif = vif.reshape(vi.shape[0], -1)
s0f = s0f.reshape(s0.shape[0], -1)
vif_ = vif_.reshape(vi_.shape[0], -1)
s0f_ = s0f_.reshape(s0_.shape[0], -1)
accu_, C = eval(vif_, s0f_, imgs_per_subj=4, num_subjs=30, num_test_imgs=s0_.shape[0])
acc_.append(accu_)
elif Proposed is True:
# reshape
shape = vif.shape
vif = vif.reshape(-1, shape[-1])
s0f = s0f.reshape(-1, shape[-1])
# reshape keypts
if vik is not None:
if s0k is not None:
vik = vik.reshape(-1, 2)
s0k = s0k.reshape(-1, 2)
# concat keypts
vif = np.concatenate((vif, vik), axis=1)
s0f = np.concatenate((s0f, s0k), axis=1)
# min-max normalization
vmin = np.expand_dims(np.amin(vif, axis=0), axis=1).T
vmax = np.expand_dims(np.amax(vif, axis=0), axis=1).T
smin = np.expand_dims(np.amin(s0f, axis=0), axis=1).T
smax = np.expand_dims(np.amax(s0f, axis=0), axis=1).T
vif = (vif - vmin) / (vmax - vmin) * 2.0 - 1.0
s0f = (s0f - smin) / (smax - smin) * 2.0 - 1.0
# reshape
shape = vif_.shape
vif_ = vif_.reshape(-1, shape[-1])
s0f_ = s0f_.reshape(-1, shape[-1])
# reshape keypts
if vik_ is not None:
if s0k_ is not None:
vik_ = vik_.reshape(-1, 2)
s0k_ = s0k_.reshape(-1, 2)
# concat keypts
vif_ = np.concatenate((vif_, vik_), axis=1)
s0f_ = np.concatenate((s0f_, s0k_), axis=1)
# min-max normalization
vmin = np.expand_dims(np.amin(vif_, axis=0), axis=1).T
vmax = np.expand_dims(np.amax(vif_, axis=0), axis=1).T
smin = np.expand_dims(np.amin(s0f_, axis=0), axis=1).T
smax = np.expand_dims(np.amax(s0f_, axis=0), axis=1).T
vif_ = (vif_ - vmin) / (vmax - vmin) * 2.0 - 1.0
s0f_ = (s0f_ - smin) / (smax - smin) * 2.0 - 1.0
vif = vif.reshape(( N, feats_size, feats_size, 66))
s0f = s0f.reshape(( N, feats_size, feats_size, 66))
vif_ = vif_.reshape(( len(vi_), feats_size, feats_size, 66))
s0f_ = s0f_.reshape(( len(s0_), feats_size, feats_size, 66))
# training parameters
batch_size = 16
nsteps = 1500 # number of total steps
nsteps_ = 100 # number of steps to train visible
if model_type == 'dsift':
""" build graph """
x = tf.placeholder(dtype=tf.float32, shape=(None, 24, 24, 64 + 2))
y = tf.placeholder(dtype=tf.float32, shape=(None, 24, 24, 64 + 2))
labels = tf.placeholder(tf.float32, shape=(None))
labels_ = tf.placeholder(tf.float32, shape=(None, 2))
else:
""" build graph """
x = tf.placeholder(dtype=tf.float32, shape=(None, 25, 25, 64 + 2))
y = tf.placeholder(dtype=tf.float32, shape=(None, 25, 25, 64 + 2))
labels = tf.placeholder(tf.float32, shape=(None))
labels_ = tf.placeholder(tf.float32, shape=(None, 2))
fy, var_list1 = residualSpectralTransform(y)
if loss_flag is False:
losses, logits, var_lists, feats0_flat, feats1_flat = crossdomainidloss(x, fy, labels, labels_, var_list0=[], var_list1=var_list1)
else:
losses, logits, var_lists, feats0_flat, feats1_flat = domaininvarianceloss(x, fy, labels, labels_, var_list0=[], var_list1=var_list1)
opt0 = tf.train.AdamOptimizer(1e-3)
opt1 = tf.train.AdamOptimizer(1e-3)
train_op0 = opt0.minimize(losses[0], var_list=var_lists[0])
train_op1 = opt1.minimize(losses[1], var_list=var_lists[1])
init_op = tf.initialize_variables(var_lists[0] + var_lists[1]+ opt0.variables() + opt1.variables())
# start session
sess = tf.keras.backend.get_session()
tf.keras.backend.set_session(sess)
saver = tf.train.Saver()
best = -1*np.ones((5,))
best_step = -1*np.ones((5,))
start = -1*np.ones((5,))
if train_flag is True:
# train the network
sess.run(init_op)
print ("count = ", count)
for step in range(nsteps):
# evaluate
if step % 1 == 0:
# batch process gallery
M = vif_.shape[0]
num_test_batches = int(M / batch_size)
rem = M % batch_size
gallery = []
for i in range(num_test_batches):
gi = sess.run(fy, feed_dict={y: vif_[i * batch_size:(i + 1) * batch_size, :, :, :],
tf.keras.backend.learning_phase(): 0})
gallery.append(gi)
if rem > 0:
gi = sess.run(fy, feed_dict={y: vif_[num_test_batches * batch_size:num_test_batches * batch_size + rem, :, :, :],
tf.keras.backend.learning_phase(): 0})
gallery.append(gi)
gallery = np.concatenate(gallery, axis=0)
gallery = gallery.reshape(vif_.shape[0], -1)
# batch process probes
M = s0f_.shape[0]
num_test_batches = int(M / batch_size)
rem = M % batch_size
probe = []
for i in range(num_test_batches):
pi = sess.run(feats0_flat, feed_dict={x: s0f_[i * batch_size:(i + 1) * batch_size, :, :, :],
tf.keras.backend.learning_phase(): 0})
probe.append(pi)
if rem > 0:
pi = sess.run(feats0_flat, feed_dict={x: s0f_[num_test_batches * batch_size:num_test_batches * batch_size + rem, :, :, :],
tf.keras.backend.learning_phase(): 0})
probe.append(pi)
probe = np.concatenate(probe, axis=0)
probe = probe.reshape(s0f_.shape[0], -1)
#Generate cosine similarity filenames
if "protocol1" in protocol or "protocol2" in protocol:
cos_similarity = os.path.join('cos_sim_v2t_' + protocol + '_' + model_type + '_' + os.path.basename(protofile_)[:4] + '_prop' + '.npy')
elif "protocol3" in protocol:
cos_similarity = os.path.join('cos_sim_v2t_' + protocol + '_' + model_type + '_' + os.path.basename(galleryProtofile_)[:6] + '_prop' + '.npy')
acc_, C = eval(gallery, probe, imgs_per_subj=4, num_subjs=30, num_test_imgs=probe.shape[0])
if step == 0:
start[count] = acc_
# Save the best model
if acc_ > best[count]:
np.save(cos_similarity, C)
best[count] = acc_
best_step[count] = step
save_best_model(iter, model_type, sess, saver)
print('step = {}, acc_ = {}, init_acc = {}, best_acc = {}, best_step = {}'.format(step, acc_, start[count], best[count], best_step[count]))
# shuffle data
ii = np.random.permutation(N)#N = length of the training set
vif = vif[ii, :, :, :]
s0f = s0f[ii, :, :, :]
labels_tr = labels_tr[ii]
vi_batch = vif[:batch_size, :, :, :]
s0_batch = s0f[:batch_size, :, :, :]
labels_batch = labels_tr[:batch_size]
labels_batch_ = 0.5 * np.ones((batch_size, 2))
if step < nsteps_:
_, L = sess.run([train_op0, losses], feed_dict={y: vi_batch, x: s0_batch, labels: labels_batch, labels_: labels_batch_, tf.keras.backend.learning_phase(): 1})
else:
_, L = sess.run([train_op1, losses], feed_dict={y: vi_batch, x: s0_batch, labels: labels_batch, labels_: labels_batch_, tf.keras.backend.learning_phase(): 1})
print('{} {} {}'.format(step, L[0], L[1]))
acc_ = best[count]
count = count + 1
else:
print("Loading pretrained network...")
checkpoint_path = load_best_model(iter, model_type)
saver.restore(sess, tf.train.latest_checkpoint(checkpoint_path) )
# batch process gallery
M = vif_.shape[0]
num_test_batches = int(M / batch_size)
rem = M % batch_size
gallery = []
for i in range(num_test_batches):
gi = sess.run(fy, feed_dict={y: vif_[i * batch_size:(i + 1) * batch_size, :, :, :],
tf.keras.backend.learning_phase(): 0})
gallery.append(gi)
if rem > 0:
gi = sess.run(fy, feed_dict={y: vif_[num_test_batches * batch_size:num_test_batches * batch_size + rem, :, :, :],
tf.keras.backend.learning_phase(): 0})
gallery.append(gi)
gallery = np.concatenate(gallery, axis=0)
gallery = gallery.reshape(vif_.shape[0], -1)
# batch process probes
M = s0f_.shape[0]
num_test_batches = int(M / batch_size)
rem = M % batch_size
probe = []
for i in range(num_test_batches):
pi = sess.run(feats0_flat, feed_dict={x: s0f_[i * batch_size:(i + 1) * batch_size, :, :, :],
tf.keras.backend.learning_phase(): 0})
probe.append(pi)
if rem > 0:
pi = sess.run(feats0_flat, feed_dict={x: s0f_[num_test_batches * batch_size:num_test_batches * batch_size + rem, :, :, :],
tf.keras.backend.learning_phase(): 0})
probe.append(pi)
probe = np.concatenate(probe, axis=0)
probe = probe.reshape(s0f_.shape[0], -1)
if "protocol1" in protocol or "protocol2" in protocol:
cos_similarity = os.path.join('cos_sim_v2t_' + protocol + '_' + model_type + '_' + os.path.basename(protofile_)[:4] + '_prop' + '.npy')
elif "protocol3" in protocol:
cos_similarity = os.path.join('cos_sim_v2t_' + protocol + '_' + model_type + '_' + os.path.basename(galleryProtofile_)[:6] + '_prop' + '.npy')
accu_, C = eval(gallery, probe, imgs_per_subj=4, num_subjs=30, num_test_imgs=probe.shape[0])
np.save(cos_similarity, C)
acc_.append(accu_)
print("acc_ =", acc_)
#exit()
return acc, acc_