blob: b9447f9517640f471236fc95c4dd7586d9152c9b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import logging
import mxnet as mx
from mxnet import profiler
import time
import os
import json
from collections import OrderedDict
from common import run_in_spawned_process
import unittest
def enable_profiler(profile_filename, run=True, continuous_dump=False, aggregate_stats=False):
profiler.set_config(profile_symbolic=True,
profile_imperative=True,
profile_memory=True,
profile_api=True,
filename=profile_filename,
continuous_dump=continuous_dump,
aggregate_stats=aggregate_stats
)
print('profile file save to {}'.format(profile_filename))
if run is True:
profiler.set_state('run')
def test_profiler():
iter_num = 5
begin_profiling_iter = 2
end_profiling_iter = 4
enable_profiler('test_profiler.json', False, False)
A = mx.sym.Variable('A')
B = mx.sym.Variable('B')
C = mx.symbol.dot(A, B)
executor = C.simple_bind(mx.cpu(1), 'write', A=(4096, 4096), B=(4096, 4096))
a = mx.random.uniform(-1.0, 1.0, shape=(4096, 4096))
b = mx.random.uniform(-1.0, 1.0, shape=(4096, 4096))
a.copyto(executor.arg_dict['A'])
b.copyto(executor.arg_dict['B'])
print("execution begin")
for i in range(iter_num):
print("Iteration {}/{}".format(i + 1, iter_num))
if i == begin_profiling_iter:
t0 = time.clock()
profiler.set_state('run')
if i == end_profiling_iter:
t1 = time.clock()
profiler.set_state('stop')
executor.forward()
c = executor.outputs[0]
c.wait_to_read()
print("execution end")
duration = t1 - t0
print('duration: {0}s'.format(duration))
print(' {0}ms/operator'.format(duration*1000/iter_num))
profiler.dump(True)
profiler.set_state('stop')
def test_profile_create_domain():
enable_profiler('test_profile_create_domain.json')
domain = profiler.Domain(name='PythonDomain')
print("Domain created: {}".format(str(domain)))
profiler.set_state('stop')
def test_profile_create_domain_dept():
profiler.set_config(profile_symbolic=True, filename='test_profile_create_domain_dept.json')
profiler.set_state('run')
domain = profiler.Domain(name='PythonDomain')
print("Domain created: {}".format(str(domain)))
profiler.dump_profile()
profiler.set_state('stop')
def test_profile_task():
def makeParams():
objects = tuple('foo' for _ in range(50))
template = ''.join('{%d}' % i for i in range(len(objects)))
return template, objects
def doLog():
template, objects = makeParams()
for _ in range(100000):
logging.info(template.format(*objects))
logging.basicConfig()
enable_profiler('test_profile_task.json')
python_domain = profiler.Domain('PythonDomain::test_profile_task')
task = profiler.Task(python_domain, "test_profile_task")
task.start()
start = time.time()
var = mx.nd.ones((1000, 500))
doLog()
var.asnumpy()
stop = time.time()
task.stop()
print('run took: %.3f' % (stop - start))
profiler.set_state('stop')
def test_profile_frame():
def makeParams():
objects = tuple('foo' for _ in range(50))
template = ''.join('{%d}' % i for i in range(len(objects)))
return template, objects
def doLog():
template, objects = makeParams()
for _ in range(100000):
logging.info(template.format(*objects))
logging.basicConfig()
enable_profiler('test_profile_frame.json')
python_domain = profiler.Domain('PythonDomain::test_profile_frame')
frame = profiler.Frame(python_domain, "test_profile_frame")
frame.start()
start = time.time()
var = mx.nd.ones((1000, 500))
doLog()
var.asnumpy()
stop = time.time()
frame.stop()
print('run took: %.3f' % (stop - start))
profiler.set_state('stop')
def test_profile_event(do_enable_profiler=True):
def makeParams():
objects = tuple('foo' for _ in range(50))
template = ''.join('{%d}' % i for i in range(len(objects)))
return template, objects
def doLog():
template, objects = makeParams()
for _ in range(100000):
logging.info(template.format(*objects))
logging.basicConfig()
if do_enable_profiler is True:
enable_profiler('test_profile_event.json')
event = profiler.Event("test_profile_event")
event.start()
start = time.time()
var = mx.nd.ones((1000, 500))
doLog()
var.asnumpy()
stop = time.time()
event.stop()
print('run took: %.3f' % (stop - start))
if do_enable_profiler is True:
profiler.set_state('stop')
def test_profile_tune_pause_resume():
enable_profiler('test_profile_tune_pause_resume.json')
profiler.pause()
# "test_profile_task" should *not* show up in tuning analysis
test_profile_task()
profiler.resume()
# "test_profile_event" should show up in tuning analysis
test_profile_event()
profiler.pause()
profiler.set_state('stop')
def test_profile_counter(do_enable_profiler=True):
def makeParams():
objects = tuple('foo' for _ in range(50))
template = ''.join('{%d}' % i for i in range(len(objects)))
return template, objects
def doLog(counter):
template, objects = makeParams()
range_size = 100000
for i in range(range_size):
if i <= range_size / 2:
counter += 1
else:
counter -= 1
logging.info(template.format(*objects))
if do_enable_profiler is True:
enable_profiler('test_profile_counter.json')
python_domain = profiler.Domain('PythonDomain::test_profile_counter')
counter = profiler.Counter(python_domain, "PythonCounter::test_profile_counter")
counter.set_value(5)
counter += 1
start = time.time()
doLog(counter)
stop = time.time()
print('run took: %.3f' % (stop - start))
if do_enable_profiler is True:
profiler.set_state('stop')
def test_continuous_profile_and_instant_marker():
file_name = 'test_continuous_profile_and_instant_marker.json'
enable_profiler(file_name, True, True, True)
python_domain = profiler.Domain('PythonDomain::test_continuous_profile')
last_file_size = 0
for i in range(5):
profiler.Marker(python_domain, "StartIteration-" + str(i)).mark('process')
print("{}...".format(i))
test_profile_event(False)
test_profile_counter(False)
profiler.dump(False)
# File size should keep increasing
new_file_size = os.path.getsize(file_name)
assert new_file_size >= last_file_size
last_file_size = new_file_size
profiler.dump(False)
debug_str = profiler.dumps()
assert(len(debug_str) > 0)
print(debug_str)
profiler.set_state('stop')
def test_aggregate_stats_valid_json_return():
file_name = 'test_aggregate_stats_json_return.json'
enable_profiler(file_name, True, True, True)
test_profile_event(False)
debug_str = profiler.dumps(format = 'json')
assert(len(debug_str) > 0)
target_dict = json.loads(debug_str)
assert "Memory" in target_dict and "Time" in target_dict and "Unit" in target_dict
profiler.set_state('stop')
def test_aggregate_stats_sorting():
sort_by_options = {'avg': "Avg", 'min': "Min", 'max': "Max", 'count': "Count"}
ascending_options = [False, True]
def check_ascending(lst, asc):
assert(lst == sorted(lst, reverse = not asc))
def check_sorting(debug_str, sort_by, ascending):
target_dict = json.loads(debug_str, object_pairs_hook=OrderedDict)
lst = []
for domain_name, domain in target_dict['Time'].items():
lst = [item[sort_by_options[sort_by]] for item_name, item in domain.items()]
check_ascending(lst, ascending)
for domain_name, domain in target_dict['Memory'].items():
lst = [item[sort_by_options[sort_by]] for item_name, item in domain.items()]
check_ascending(lst, ascending)
file_name = 'test_aggregate_stats_sorting.json'
enable_profiler(file_name, True, True, True)
test_profile_event(False)
for sb in sort_by_options:
for asc in ascending_options:
debug_str = profiler.dumps(format = 'json', sort_by = sb, ascending = asc)
check_sorting(debug_str, sb, asc)
profiler.set_state('stop')
def test_aggregate_duplication():
file_name = 'test_aggregate_duplication.json'
enable_profiler(profile_filename = file_name, run=True, continuous_dump=True, \
aggregate_stats=True)
inp = mx.nd.zeros(shape=(100, 100))
y = mx.nd.sqrt(inp)
inp = inp + 1
inp = inp + 1
mx.nd.waitall()
profiler.dump(False)
debug_str = profiler.dumps(format = 'json')
target_dict = json.loads(debug_str)
assert 'Time' in target_dict and 'operator' in target_dict['Time'] \
and 'sqrt' in target_dict['Time']['operator'] \
and 'Count' in target_dict['Time']['operator']['sqrt'] \
and '_plus_scalar' in target_dict['Time']['operator'] \
and 'Count' in target_dict['Time']['operator']['_plus_scalar']
# they are called once and twice respectively
assert target_dict['Time']['operator']['sqrt']['Count'] == 1
assert target_dict['Time']['operator']['_plus_scalar']['Count'] == 2
profiler.set_state('stop')
def test_custom_operator_profiling(seed = None, file_name = None):
class Sigmoid(mx.operator.CustomOp):
def forward(self, is_train, req, in_data, out_data, aux):
x = in_data[0].asnumpy()
import numpy as np
y = 1.0 / (1.0 + np.exp(-x))
self.assign(out_data[0], req[0], mx.nd.array(y))
# Create a dummy matrix using nd.zeros. Test if 'MySigmoid::_zeros' is in dump file
dummy = mx.nd.zeros(shape=(100, 100))
def backward(self, req, out_grad, in_data, out_data, in_grad, aux):
y = out_data[0].asnumpy()
dy = out_grad[0].asnumpy()
dx = dy*(1.0 - y)*y
self.assign(in_grad[0], req[0], mx.nd.array(dx))
@mx.operator.register('MySigmoid')
class SigmoidProp(mx.operator.CustomOpProp):
def __init__(self):
super(SigmoidProp, self).__init__(True)
def list_arguments(self):
return ['data']
def list_outputs(self):
return ['output']
def infer_shape(self, in_shapes):
data_shape = in_shapes[0]
output_shape = data_shape
return (data_shape,), (output_shape,), ()
def create_operator(self, ctx, in_shapes, in_dtypes):
return Sigmoid()
if file_name is None:
file_name = 'test_custom_operator_profiling.json'
enable_profiler(profile_filename = file_name, run=True, continuous_dump=True,\
aggregate_stats=True)
x = mx.nd.array([0, 1, 2, 3])
x.attach_grad()
with mx.autograd.record():
y = mx.nd.Custom(x, op_type='MySigmoid')
y.backward()
mx.nd.waitall()
profiler.dump(False)
debug_str = profiler.dumps(format = 'json')
target_dict = json.loads(debug_str)
assert 'Time' in target_dict and 'Custom Operator' in target_dict['Time'] \
and 'MySigmoid::pure_python' in target_dict['Time']['Custom Operator'] \
and '_backward_MySigmoid::pure_python' in target_dict['Time']['Custom Operator'] \
and 'MySigmoid::_zeros' in target_dict['Time']['Custom Operator']
profiler.set_state('stop')
def test_custom_operator_profiling_multiple_custom_ops_imperative(seed = None, \
mode = 'imperative', file_name = None):
class MyAdd(mx.operator.CustomOp):
def forward(self, is_train, req, in_data, out_data, aux):
self.assign(out_data[0], req[0], in_data[0] + 1)
def backward(self, req, out_grad, in_data, out_data, in_grad, aux):
self.assign(in_grad[0], req[0], out_grad[0])
@mx.operator.register('MyAdd1')
class MyAdd1Prop(mx.operator.CustomOpProp):
def __init__(self):
super(MyAdd1Prop, self).__init__(need_top_grad=True)
def list_arguments(self):
return ['data']
def list_outputs(self):
return ['output']
def infer_shape(self, in_shape):
# inputs, outputs, aux
return [in_shape[0]], [in_shape[0]], []
def create_operator(self, ctx, shapes, dtypes):
return MyAdd()
@mx.operator.register('MyAdd2')
class MyAdd2Prop(mx.operator.CustomOpProp):
def __init__(self):
super(MyAdd2Prop, self).__init__(need_top_grad=True)
def list_arguments(self):
return ['data']
def list_outputs(self):
return ['output']
def infer_shape(self, in_shape):
# inputs, outputs, aux
return [in_shape[0]], [in_shape[0]], []
def create_operator(self, ctx, shapes, dtypes):
return MyAdd()
if file_name is None:
file_name = 'test_custom_operator_profiling_multiple_custom_ops_imperative.json'
enable_profiler(profile_filename = file_name, run=True, continuous_dump=True,\
aggregate_stats=True)
inp = mx.nd.zeros(shape=(100, 100))
if mode == 'imperative':
x = inp + 1
y = mx.nd.Custom(inp, op_type='MyAdd1')
z = mx.nd.Custom(inp, op_type='MyAdd2')
elif mode == 'symbolic':
a = mx.symbol.Variable('a')
b = a + 1
c = mx.symbol.Custom(data=a, op_type='MyAdd1')
d = mx.symbol.Custom(data=a, op_type='MyAdd2')
b.bind(mx.cpu(), {'a': inp}).forward()
c.bind(mx.cpu(), {'a': inp}).forward()
d.bind(mx.cpu(), {'a': inp}).forward()
mx.nd.waitall()
profiler.dump(False)
debug_str = profiler.dumps(format = 'json')
target_dict = json.loads(debug_str)
'''
We are calling _plus_scalar within MyAdd1 and MyAdd2 and outside both the custom
operators, so in aggregate stats we should have three different kinds of
_plus_scalar under domains "Custom Operator" and "operator"
'''
assert 'Time' in target_dict and 'Custom Operator' in target_dict['Time'] \
and 'MyAdd1::pure_python' in target_dict['Time']['Custom Operator'] \
and 'MyAdd2::pure_python' in target_dict['Time']['Custom Operator'] \
and 'MyAdd1::_plus_scalar' in target_dict['Time']['Custom Operator'] \
and 'MyAdd2::_plus_scalar' in target_dict['Time']['Custom Operator'] \
and '_plus_scalar' not in target_dict['Time']['Custom Operator'] \
and 'operator' in target_dict['Time'] \
and '_plus_scalar' in target_dict['Time']['operator']
profiler.set_state('stop')
@unittest.skip("Flaky test https://github.com/apache/incubator-mxnet/issues/15406")
def test_custom_operator_profiling_multiple_custom_ops_symbolic():
run_in_spawned_process(test_custom_operator_profiling_multiple_custom_ops_imperative, \
{'MXNET_EXEC_BULK_EXEC_INFERENCE' : 0, \
'MXNET_EXEC_BULK_EXEC_TRAIN' : 0}, \
'symbolic', \
'test_custom_operator_profiling_multiple_custom_ops_symbolic.json')
@unittest.skip("Flaky test https://github.com/apache/incubator-mxnet/issues/15406")
def test_custom_operator_profiling_naive_engine():
# run the three tests above using Naive Engine
run_in_spawned_process(test_custom_operator_profiling, \
{'MXNET_ENGINE_TYPE' : "NaiveEngine"}, \
'test_custom_operator_profiling_naive.json')
run_in_spawned_process(test_custom_operator_profiling_multiple_custom_ops_imperative, \
{'MXNET_ENGINE_TYPE' : "NaiveEngine"}, \
'imperative', \
'test_custom_operator_profiling_multiple_custom_ops_imperative_naive.json')
run_in_spawned_process(test_custom_operator_profiling_multiple_custom_ops_imperative, \
{'MXNET_ENGINE_TYPE' : "NaiveEngine", \
'MXNET_EXEC_BULK_EXEC_INFERENCE' : 0, \
'MXNET_EXEC_BULK_EXEC_TRAIN' : 0}, \
'symbolic', \
'test_custom_operator_profiling_multiple_custom_ops_symbolic_naive.json')
if __name__ == '__main__':
import nose
nose.runmodule()