blob: f652f86eede2749f8d870cfdd7fe2548e91051c0 [file] [log] [blame]
#!/usr/bin/env python
#/************************************************************
#*
#* Licensed to the Apache Software Foundation (ASF) under one
#* or more contributor license agreements. See the NOTICE file
#* distributed with this work for additional information
#* regarding copyright ownership. The ASF licenses this file
#* to you under the Apache License, Version 2.0 (the
#* "License"); you may not use this file except in compliance
#* with the License. You may obtain a copy of the License at
#*
#* http://www.apache.org/licenses/LICENSE-2.0
#*
#* Unless required by applicable law or agreed to in writing,
#* software distributed under the License is distributed on an
#* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#* KIND, either express or implied. See the License for the
#* specific language governing permissions and limitations
#* under the License.
#*
#*************************************************************/
'''
This script includes Model class and its subclasses that
users can configure model parameter.
'''
import sys, re, subprocess
from singa.layer import *
from singa.utils.utility import *
from singa.utils.message import *
from google.protobuf import text_format
class Model(object):
''' Configure model parameter
- add(): add layer
- compile(): specify Updater and Cluster protos
- build(): construct a model (i.e., NetProto)
- fit(): run singa for training
- evaluate(): run singa for testing
'''
def __init__(self, name='my model', argv=None, label=False):
'''
optional
name = (string) // name of model/job
argv // pass sys.argv to source
label = (bool) // exist label layer (depreciated)
'''
self.jobconf = Message('Job', name=name).proto
self.layers = []
self.label = label
self.argv = argv
self.result = None
self.last_checkpoint_path = None
self.cudnn = False
self.accuracy = False
def add(self, layer):
'''
add layer
'''
pass
def exist_datalayer(self, phase):
'''
check if data layer exists
'''
for ly in self.layers:
if enumPhase(phase) in ly.layer.include:
return True
return False
def compile(self, optimizer=None, cluster=None,
loss=None, topk=1, **kwargs):
'''
required
optimizer = (Updater) // updater settings, e.g., SGD
cluster = (Cluster) // cluster settings
optional
loss = (string) // name of loss function type
topk = (int) // nb of results considered to compute accuracy
'''
assert optimizer != None, 'optimizer (Updater component) should be set'
assert cluster != None, 'cluster (Cluster component) should be set'
setval(self.jobconf, updater=optimizer.proto)
setval(self.jobconf, cluster=cluster.proto)
# take care of loss function layer
if loss == None:
print 'loss layer is not set'
else:
if hasattr(self.layers[-1], 'mask'):
ly = self.layers[-1].mask
else:
ly = self.layers[-1].layer
# take care of the last layer
if ly.type == enumLayerType('softmax'):
# revise the last layer
if loss == 'categorical_crossentropy':
setval(ly, type=enumLayerType('softmaxloss'))
setval(ly.softmaxloss_conf, topk=topk)
elif loss == 'mean_squared_error':
setval(ly, type=enumLayerType('euclideanloss'))
else:
# add new layer
if loss == 'categorical_crossentropy':
self.add(Activation('softmaxloss', topk=topk))
elif loss == 'mean_squared_error':
self.add(Activation('euclideanloss'))
elif loss == 'user_loss_rnnlm': # user-defined loss layer
self.add(UserLossRNNLM(nclass=kwargs['nclass'],
vocab_size=kwargs['in_dim']))
def build(self):
'''
construct neuralnet proto
'''
net = NetProto()
slyname = self.layers[0].layer.name
for i in range(len(self.layers)):
ly = net.layer.add()
ly.CopyFrom(self.layers[i].layer)
lastly = ly
if self.layers[i].is_datalayer == True:
continue
getattr(ly, 'srclayers').append(slyname)
slyname = ly.name
if hasattr(self.layers[i], 'mask'):
mly = net.layer.add()
mly.CopyFrom(self.layers[i].mask)
getattr(mly, 'srclayers').append(slyname)
slyname = mly.name
lastly = mly
if hasattr(self.layers[i], 'bidirect'):
bly = net.layer.add()
bly.CopyFrom(self.layers[i].bidirect)
getattr(bly, 'srclayers').append(slyname)
# deal with label layer (depreciated)
if self.label == True:
label_layer = Layer(name='label', type=kLabel)
ly = net.layer.add()
ly.CopyFrom(label_layer.layer)
getattr(ly, 'srclayers').append(self.layers[0].layer.name)
getattr(lastly, 'srclayers').append(label_layer.layer.name)
else:
if lastly.name == 'RBMVis':
getattr(lastly, 'srclayers').append(bly.name)
else:
getattr(lastly, 'srclayers').append(self.layers[0].layer.name)
if self.accuracy == True:
smly = net.layer.add()
smly.CopyFrom(Layer(name='softmax', type=kSoftmax).layer)
setval(smly, include=kTest)
getattr(smly, 'srclayers').append(self.layers[-1].layer.name)
aly = net.layer.add()
aly.CopyFrom(Accuracy().layer)
setval(aly, include=kTest)
getattr(aly, 'srclayers').append('softmax')
getattr(aly, 'srclayers').append(self.layers[0].layer.name)
# use of cudnn
if self.cudnn == True:
self.set_cudnn_layer_type(net)
setval(self.jobconf, neuralnet=net)
def fit(self, data=None, alg='bp', nb_epoch=0,
with_test=False, execpath='', device=None, **fields):
'''
required
data = (Data) // Data class object for training data
alg = (string) // algorithm, e.g., 'bp', 'cd'
nb_epoch = (int) // the number of training steps
optional
with_test = (bool) // flag if singa runs for test data
execpath = (string) // path to user own singa (executable file)
device = (int/list) // a list of gpu ids
**fields (KEY=VALUE)
batch_size = (int) // batch size for training data
train_steps = (int) // nb of steps for training, i.e., epoch
disp_freq = (int) // frequency to display training info
disp_after = (int) // display after this number
validate_data = (Data) // valid data, specified in load_data()
validate_freq = (int) // frequency of validation
validate_steps = (int) // total number of steps for validation
validate_after = (int) // start validation after this number
checkpoint_path = (string) // path to checkpoint file
checkpoint_freq = (int) // frequency for checkpoint
checkpoint_after = (int) // start checkpointing after this number
'''
assert data != None, 'Training data shold be set'
assert nb_epoch > 0, 'Training steps shold be set'
if 'batch_size' in fields: # if new value is set, replace it
setval(data.layer.store_conf, batchsize=fields['batch_size'])
# insert layer for training
if self.exist_datalayer('train') == False:
self.layers.insert(0, data)
setval(self.jobconf, train_steps=nb_epoch)
setval(self.jobconf, disp_freq=nb_epoch/10)
if 'disp_freq' in fields:
setval(self.jobconf, disp_freq=fields['disp_freq'])
if 'validate_data' in fields:
self.layers.insert(1, fields['validate_data'])
setval(self.jobconf, validate_freq=nb_epoch/10)
setval(self.jobconf, **fields)
# loading checkpoint if it is set
if data.checkpoint != None:
setval(self.jobconf, checkpoint_path=data.checkpoint)
# save model parameter (i.e., checkpoint_path)
setval(self.jobconf, checkpoint_freq=nb_epoch)
self.last_checkpoint_path = '{0}/step{1}-worker0'.format(
self.jobconf.cluster.workspace, nb_epoch)
# set Train_one_batch component, using backprogapation at default
setval(self.jobconf,
train_one_batch=Algorithm(type=enumAlgType(alg)).proto)
# use of cudnn
if device != None:
setval(self.jobconf, gpu=device)
self.cudnn = True
# start to run singa for training
if with_test == False:
self.build() # construct Nneuralnet Component
#self.display()
return SingaRun(jobproto=self.jobconf,
argv=self.argv, execpath=execpath)
else:
# run singa in evaluate() with test data
pass
def evaluate(self, data=None, alg='bp',
checkpoint_path=None, execpath='',
device=None, show_acc=False, **fields):
'''
required
data = (Data) // Data class object for testing data
optional
alg = (string) // algorithm type, (bp at default)
checkpoint_path = (list) // checkpoint path
execpaths = (string) // path to user's own executable
device = (int/list) // a list of gpu ids
show_acc = (bool) // compute and the accuacy
**fields (KEY=VALUE)
batch_size = (int) // batch size for testing data
test_freq = (int) // frequency of testing
test_steps = (int) // total number of steps for testing
test_after = (int) // start testing after this number of steps
'''
assert data != None, 'Testing data should be set'
is_testonly = False
if 'batch_size' in fields: # if new value is set, replace it
setval(data.layer.store_conf, batchsize=fields['batch_size'])
# insert layer for testing
if self.exist_datalayer('test') == False:
self.layers.insert(0, data)
# loading checkpoint if singa runs only for testing
if self.exist_datalayer('train') == False:
is_testonly = True
if checkpoint_path == None:
print 'checkpoint_path has not been specified'
else:
setval(self.jobconf, checkpoint_path=checkpoint_path)
steps = fields['test_steps'] if 'test_steps' in fields else 10
setval(self.jobconf, test_steps=steps)
setval(self.jobconf, **fields)
# set Train_one_batch component, using backprogapation at default
setval(self.jobconf,
train_one_batch=Algorithm(type=enumAlgType(alg)).proto)
# use of cudnn
if device != None:
setval(self.jobconf, gpu=device)
self.cudnn = True
# set True if showing the accuracy
self.accuracy = show_acc
self.build() # construct Nneuralnet Component
#--- generate job.conf file for debug purpose
#filename = 'job.conf'
#with open(filename, 'w') as f:
# f.write(text_format.MessageToString(self.jobconf.cluster))
#self.display()
#--- run singa ---
return SingaRun(jobproto=self.jobconf,
argv=self.argv, execpath=execpath, testmode=is_testonly)
#return SingaRun_script(filename=filename, execpath=execpath)
def display(self):
''' print out job proto
'''
print text_format.MessageToString(self.jobconf)
def set_cudnn_layer_type(self, net):
''' convert LayerType to CdunnLayerType
'''
for i in range(len(net.layer)):
ly_type = net.layer[i].type
cudnn_ly_type = ly_type
if ly_type == kCConvolution: cudnn_ly_type = kCudnnConv
elif ly_type == kCPooling: cudnn_ly_type = kCudnnPool
elif ly_type == kLRN: cudnn_ly_type = kCudnnLRN
elif ly_type == kSoftmax: cudnn_ly_type = kCudnnSoftmax
elif ly_type == kSoftmaxLoss: cudnn_ly_type = kCudnnSoftmaxLoss
elif ly_type == kSTanh:
cudnn_ly_type = kCudnnActivation
net.layer[i].activation_conf.type = STANH
elif ly_type == kSigmoid:
cudnn_ly_type = kCudnnActivation
net.layer[i].activation_conf.type = SIGMOID
elif ly_type == kReLU:
cudnn_ly_type = kCudnnActivation
net.layer[i].activation_conf.type = RELU
net.layer[i].type = cudnn_ly_type
class Energy(Model):
''' energy model
'''
def __init__(self, name='my model', argv=[], label=False):
super(Energy, self).__init__(name=name, argv=argv, label=label)
def add(self, layer):
if hasattr(layer, 'layer_type'):
if layer.layer_type == kRBMVis:
dim = 0
for i in range(1, len(layer.out_dim)):
parw = Parameter(name='w', init='none', level=i)
parb = Parameter(name='b', init='none', level=i)
dim = layer.out_dim[i-1]
self.layers.append(Dense(dim, w_param=parw, b_param=parb,
activation='sigmoid'))
self.layers.append(layer)
class Sequential(Model):
''' sequential model
'''
def __init__(self, name='my model', argv=[], label=False):
super(Sequential, self).__init__(name=name, argv=argv, label=label)
def add(self, layer):
if hasattr(layer, 'layer_type'):
if layer.layer_type == 'AutoEncoder':
dim = 0
if layer.param_share == True:
# Encoding
for i in range(1, len(layer.hid_dim)+1):
parw = Parameter(name='w',
init='none', level=i)
parb = Parameter(name='b',
init='none', level=i)
dim = layer.hid_dim[i-1]
if i == len(layer.hid_dim): activation = None
else: activation = layer.activation
self.layers.append(Dense(dim,
w_param=parw, b_param=parb,
activation=activation))
# Decoding
for i in range(len(layer.hid_dim), 0, -1):
parw = Parameter(name=generate_name('w', 2),
init='none')
parb = Parameter(name=generate_name('b', 2),
init='none')
setval(parw.param, share_from='w'+str(i))
setval(parb.param, name='b'+str(i))
if i == 1: dim = layer.out_dim
else: dim = layer.hid_dim[i-2]
self.layers.append(Dense(dim,
w_param=parw, b_param=parb,
activation=layer.activation,
transpose=True))
else:
# MLP
for i in range(1, len(layer.hid_dim)+2):
parw = Parameter(name='w',
init='none', level=i)
parb = Parameter(name='b',
init='none', level=i)
if i == len(layer.hid_dim)+1: dim = layer.out_dim
else: dim = layer.hid_dim[i-1]
self.layers.append(Dense(dim,
w_param=parw, b_param=parb,
activation=layer.activation))
else:
self.layers.append(layer)
else:
self.layers.append(layer)
class Store(object):
def __init__(self, **kwargs):
'''
**kwargs
path = (string) // path to dataset
backend = (string) //
batch_size = (int) // batch size of dataset
shape = (int) //
'''
self.proto = Message('Store', **kwargs).proto
class Algorithm(object):
def __init__(self, type=enumAlgType('bp'), **kwargs):
'''
type = (string) // type of algorithm, bp at default
'''
alg = Message('Alg', alg=type, **kwargs).proto
if type == enumAlgType('cd'):
setval(alg.cd_conf, **kwargs)
self.proto = alg
class Updater(object):
def __init__(self, upd_type, lr, lr_type,
decay, momentum,
step, step_lr, **fields):
'''
required
upd_type = (enum) // enum type of updater
lr = (float) // base learning rate
optional
lr_type = (string) // type of the learning rate (Fixed at default)
'''
upd = Message('Updater', type=upd_type, **fields).proto
setval(upd.learning_rate, base_lr=lr)
if decay > 0:
setval(upd, weight_decay=decay)
if momentum > 0:
setval(upd, momentum=momentum)
if lr_type == None or lr_type == "fixed":
setval(upd.learning_rate, type=kFixed)
elif lr_type == 'step':
cp = Message('Step', change_freq=60, gamma=0.997)
setval(upd.learning_rate, type=kStep, step_conf=cp.proto)
elif lr_type == 'manual':
cp = Message('FixedStep', step=step, step_lr=step_lr)
setval(upd.learning_rate, type=kFixedStep, fixedstep_conf=cp.proto)
elif lr_type == 'linear':
cp = Message('Linear', change_freq=10, final_lr=0.1)
setval(upd.learning_rate, type=kLinear, linear_conf=cp.proto)
self.proto = upd
class SGD(Updater):
def __init__(self, lr=0.01, lr_type=None,
decay=0, momentum=0,
step=(0), step_lr=(0.01), **fields):
'''
required
lr = (float) // base learning rate
optional
lr_type = (string) // type of learning rate, 'Fixed' at default
decay = (float) // weight decay
momentum = (float) // momentum
step = (int/list) // steps
step_lr = (float/list) // learning rate after the steps
**fields (KEY=VALUE)
'''
assert lr
super(SGD, self).__init__(upd_type=kSGD,
lr=lr, lr_type=lr_type,
decay=decay, momentum=momentum,
step=step, step_lr=step_lr, **fields)
class AdaGrad(Updater):
def __init__(self, lr=0.01, lr_type=None,
decay=0, momentum=0,
step=(0), step_lr=(0.01), **fields):
'''
required
lr = (float) // base learning rate
optional
lr_type = (string) // type of learning rate, 'Fixed' at default
decay = (float) // weight decay
momentum = (float) // momentum
step = (int/list) // steps
step_lr = (float/list) // learning rate after the steps
**fields (KEY=VALUE)
'''
assert lr
super(AdaGrad, self).__init__(upd_type=kAdaGrad,
lr=lr, lr_type=lr_type,
decay=decay, momentum=momentum,
step=step, step_lr=step_lr, **fields)
class Cluster(object):
""" Specify the cluster topology, e.g., number of workers/servers.
Currently we need to create this object in the .py file and also provide a
cluster configuration file to the command line. TODO(wangwei) update SINGA
code to eliminate the requirement of the cluster configuration file for
training on a single node or the cluster object in the pyfile for training
in a cluster.
"""
def __init__(self, workspace=None,
nworker_groups=1, nserver_groups=1,
nworkers_per_group=1, nservers_per_group=1,
nworkers_per_procs=1, nservers_per_procs=1,
**fields):
'''
required
workspace = (string) // workspace path
optional
nworker_groups = (int)
nserver_groups = (int)
nworkers_per_group = (int)
nservers_per_group = (int)
nworkers_per_procs = (int)
nservers_per_procs = (int)
**fields
server_worker_separate = (bool)
'''
assert workspace != None, 'need to set workspace'
self.proto = Message('Cluster', workspace=workspace).proto
# optional
self.proto.nworker_groups = nworker_groups
self.proto.nserver_groups = nserver_groups
self.proto.nworkers_per_group = nworkers_per_group
self.proto.nservers_per_group = nservers_per_group
self.proto.nworkers_per_procs = nworkers_per_procs
self.proto.nservers_per_procs = nservers_per_procs
# other fields
setval(self.proto, **fields)
def StoreResults(lines):
""" Parsing metrics from each line in the log file.
TODO(wangwei) format the log string to make them uniform for easy parsing
Another approach is creating a protobuf message for metrics, which can be
used for dumping metrics to string and loading perf string back to messages.
"""
resultDic = {}
for line in lines:
line = re.findall(r'[\w|*.*]+', line)
if 'Train' in line:
step = line[line.index('step')+1]
if 'accuracy' in line:
resultDic.setdefault(step, {})['acc'] \
= line[line.index('accuracy')+1]
if 'loss' in line:
resultDic.setdefault(step, {})['loss'] \
= line[line.index('loss')+1]
if 'ppl' in line:
resultDic.setdefault(step, {})['ppl'] \
= line[line.index('ppl')+1]
if 'Squared' in line:
resultDic.setdefault(step, {})['se'] \
= line[line.index('Squared')+2]
return resultDic
def SingaRun(jobproto='', argv=None, execpath='', testmode=False):
"""
Run Singa and receive the training/test results.
"""
import singa.driver as driver
d = driver.Driver()
d.InitLog(argv[0])
d.Init(argv)
if testmode == True:
d.Test(jobproto.SerializeToString())
else:
d.Train(False, jobproto.SerializeToString())
# Get the performance from the latest log file.
# TODO(wangwei) the log file would be overwritten by other running instance
# of the same program, e.g., lt-singa
logfile = '/tmp/singa-log/{0}.ERROR'.format(argv[0].split('/')[-1])
fin = open(logfile, 'r')
result = StoreResults(fin.readlines())
return result
def SingaRun_script(filename='', execpath=''):
"""
Deprecated.
Generate the job conf file and run the shell command.
"""
SINGAROOT = '../../../'
conf = 'examples/' + filename
if execpath == '':
cmd = SINGAROOT+'bin/singa-run.sh ' \
+ '-conf %s ' % conf
else:
cmd = SINGAROOT+'bin/singa-run.sh ' \
+ '-conf %s ' % conf \
+ '-exec %s ' % execpath
procs = subprocess.Popen(cmd.strip().split(' '),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
resultDic = {}
outputlines = iter(procs.stdout.readline, '')
resultDic = StoreResults(outputlines)
#TODO better format to store the result??
return resultDic