blob: 96045516c69e6b06b336c50a033c26611dbc542f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Tests for individual operators
This module contains operator tests which currently do not exist on
ONNX backend test framework. Once we have PRs on the ONNX repo and get
those PRs merged, this file will get EOL'ed.
"""
# pylint: disable=too-many-locals,wrong-import-position,import-error
from __future__ import absolute_import
import sys
import os
import unittest
import logging
import tarfile
from collections import namedtuple
import numpy as np
import numpy.testing as npt
from onnx import checker, numpy_helper, helper, load_model
from onnx import TensorProto
from mxnet.test_utils import download
from mxnet.contrib import onnx as onnx_mxnet
import mxnet as mx
import backend
CURR_PATH = os.path.dirname(os.path.abspath(os.path.expanduser(__file__)))
sys.path.insert(0, os.path.join(CURR_PATH, '../../python/unittest'))
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
def get_rnd(shape, low=-1.0, high=1.0, dtype=np.float32):
if dtype == np.float32:
return (np.random.uniform(low, high,
np.prod(shape)).reshape(shape).astype(np.float32))
elif dtype == np.int32:
return (np.random.randint(low, high,
np.prod(shape)).reshape(shape).astype(np.float32))
elif dtype == np.bool_:
return np.random.choice(a=[False, True], size=shape).astype(np.float32)
def _fix_attributes(attrs, attribute_mapping):
new_attrs = attrs
attr_modify = attribute_mapping.get('modify', {})
for k, v in attr_modify.items():
new_attrs[v] = new_attrs.pop(k, None)
attr_add = attribute_mapping.get('add', {})
for k, v in attr_add.items():
new_attrs[k] = v
attr_remove = attribute_mapping.get('remove', [])
for k in attr_remove:
if k in new_attrs:
del new_attrs[k]
return new_attrs
def forward_pass(sym, arg, aux, data_names, input_data):
""" Perform forward pass on given data
:param sym: Symbol
:param arg: Arg params
:param aux: Aux params
:param data_names: Input names (list)
:param input_data: Input data (list). If there is only one input,
pass it as a list. For example, if input is [1, 2],
pass input_data=[[1, 2]]
:return: result of forward pass
"""
data_shapes = []
data_forward = []
for idx in range(len(data_names)):
val = input_data[idx]
data_shapes.append((data_names[idx], np.shape(val)))
data_forward.append(mx.nd.array(val))
# create module
mod = mx.mod.Module(symbol=sym, data_names=data_names, context=mx.cpu(), label_names=None)
mod.bind(for_training=False, data_shapes=data_shapes, label_shapes=None)
if not arg and not aux:
mod.init_params()
else:
mod.set_params(arg_params=arg, aux_params=aux,
allow_missing=True, allow_extra=True)
# run inference
batch = namedtuple('Batch', ['data'])
mod.forward(batch(data_forward), is_train=False)
return mod.get_outputs()[0].asnumpy()
def get_input_tensors(input_data):
input_tensor = []
input_names = []
input_sym = []
for idx, ip in enumerate(input_data):
name = "input" + str(idx + 1)
input_sym.append(mx.sym.Variable(name))
input_names.append(name)
input_tensor.append(helper.make_tensor_value_info(name,
TensorProto.FLOAT, shape=np.shape(ip)))
return input_names, input_tensor, input_sym
def get_onnx_graph(testname, input_names, inputs, output_name, output_shape, attr):
outputs = [helper.make_tensor_value_info("output", TensorProto.FLOAT, shape=output_shape)]
nodes = [helper.make_node(output_name, input_names, ["output"], **attr)]
graph = helper.make_graph(nodes, testname, inputs, outputs)
model = helper.make_model(graph)
return model
class TestNode(unittest.TestCase):
""" Tests for models.
Tests are dynamically added.
Therefore edit test_models to add more tests.
"""
def test_import_export(self):
for test in test_cases:
test_name, mxnet_op, onnx_name, inputs, attrs, mxnet_specific, fix_attrs, check_value, check_shape = test
with self.subTest(test_name):
names, input_tensors, inputsym = get_input_tensors(inputs)
if inputs:
test_op = mxnet_op(*inputsym, **attrs)
mxnet_output = forward_pass(test_op, None, None, names, inputs)
outputshape = np.shape(mxnet_output)
else:
test_op = mxnet_op(**attrs)
shape = attrs.get('shape', (1,))
x = mx.nd.zeros(shape, dtype='float32')
xgrad = mx.nd.zeros(shape, dtype='float32')
exe = test_op.bind(ctx=mx.cpu(), args={'x': x}, args_grad={'x': xgrad})
mxnet_output = exe.forward(is_train=False)[0].asnumpy()
outputshape = np.shape(mxnet_output)
if mxnet_specific:
onnxmodelfile = onnx_mxnet.export_model(test_op, {}, [np.shape(ip) for ip in inputs],
np.float32,
onnx_name + ".onnx")
onnxmodel = load_model(onnxmodelfile)
else:
onnx_attrs = _fix_attributes(attrs, fix_attrs)
onnxmodel = get_onnx_graph(test_name, names, input_tensors, onnx_name, outputshape, onnx_attrs)
bkd_rep = backend.prepare(onnxmodel, operation='export')
output = bkd_rep.run(inputs)
if check_value:
npt.assert_almost_equal(output[0], mxnet_output)
if check_shape:
npt.assert_equal(output[0].shape, outputshape)
input1 = get_rnd((1, 10, 2, 3))
ipsym = mx.sym.Variable("input1")
for test in test_scalar_ops:
if test == 'Add':
outsym = 2 + ipsym
if test == "Sub":
outsym = ipsym - 2
if test == "rSub":
outsym = ipsym.__rsub__(2)
if test == "Mul":
outsym = 2 * ipsym
if test == "Div":
outsym = ipsym / 2
if test == "Pow":
outsym = ipsym ** 2
forward_op = forward_pass(outsym, None, None, ['input1'], input1)
converted_model = onnx_mxnet.export_model(outsym, {}, [np.shape(input1)], np.float32,
onnx_file_path=outsym.name + ".onnx")
sym, arg_params, aux_params = onnx_mxnet.import_model(converted_model)
result = forward_pass(sym, arg_params, aux_params, ['input1'], input1)
npt.assert_almost_equal(result, forward_op)
def test_imports(self):
for test in import_test_cases:
test_name, onnx_name, inputs, np_op, attrs = test
with self.subTest(test_name):
names, input_tensors, inputsym = get_input_tensors(inputs)
np_out = [np_op(*inputs, **attrs)]
output_shape = np.shape(np_out)
onnx_model = get_onnx_graph(test_name, names, input_tensors, onnx_name, output_shape, attrs)
bkd_rep = backend.prepare(onnx_model, operation='import')
mxnet_out = bkd_rep.run(inputs)
npt.assert_almost_equal(np_out, mxnet_out, decimal=4)
def test_exports(self):
input_shape = (2,1,3,1)
for test in export_test_cases:
test_name, onnx_name, mx_op, attrs = test
input_sym = mx.sym.var('data')
outsym = mx_op(input_sym, **attrs)
converted_model = onnx_mxnet.export_model(outsym, {}, [input_shape], np.float32,
onnx_file_path=outsym.name + ".onnx")
model = load_model(converted_model)
checker.check_model(model)
# test_case = ("test_case_name", mxnet op, "ONNX_op_name", [input_list], attribute map, MXNet_specific=True/False,
# fix_attributes = {'modify': {mxnet_attr_name: onnx_attr_name},
# 'remove': [attr_name],
# 'add': {attr_name: value},
# check_value=True/False, check_shape=True/False)
test_cases = [
("test_equal", mx.sym.broadcast_equal, "Equal", [get_rnd((1, 3, 4, 5)), get_rnd((1, 5))], {}, False, {}, True,
False),
("test_greater", mx.sym.broadcast_greater, "Greater", [get_rnd((1, 3, 4, 5)), get_rnd((1, 5))], {}, False, {}, True,
False),
("test_less", mx.sym.broadcast_lesser, "Less", [get_rnd((1, 3, 4, 5)), get_rnd((1, 5))], {}, False, {}, True,
False),
("test_and", mx.sym.broadcast_logical_and, "And",
[get_rnd((3, 4, 5), dtype=np.bool_), get_rnd((3, 4, 5), dtype=np.bool_)], {}, False, {}, True, False),
("test_xor", mx.sym.broadcast_logical_xor, "Xor",
[get_rnd((3, 4, 5), dtype=np.bool_), get_rnd((3, 4, 5), dtype=np.bool_)], {}, False, {}, True, False),
("test_or", mx.sym.broadcast_logical_or, "Or",
[get_rnd((3, 4, 5), dtype=np.bool_), get_rnd((3, 4, 5), dtype=np.bool_)], {}, False, {}, True, False),
("test_not", mx.sym.logical_not, "Not", [get_rnd((3, 4, 5), dtype=np.bool_)], {}, False, {}, True, False),
("test_square", mx.sym.square, "Pow", [get_rnd((2, 3), dtype=np.int32)], {}, True, {}, True, False),
("test_spacetodepth", mx.sym.space_to_depth, "SpaceToDepth", [get_rnd((1, 1, 4, 6))],
{'block_size': 2}, False, {}, True, False),
("test_softmax", mx.sym.SoftmaxOutput, "Softmax", [get_rnd((1000, 1000)), get_rnd(1000)],
{'ignore_label': 0, 'use_ignore': False}, True, {}, True, False),
("test_logistic_regression", mx.sym.LogisticRegressionOutput, "Sigmoid",
[get_rnd((1000, 1000)), get_rnd((1000, 1000))], {}, True, {}, True, False),
("test_fullyconnected", mx.sym.FullyConnected, "Gemm", [get_rnd((4, 3)), get_rnd((4, 3)), get_rnd(4)],
{'num_hidden': 4, 'name': 'FC'}, True, {}, True, False),
("test_lppool1", mx.sym.Pooling, "LpPool", [get_rnd((2, 3, 20, 20))],
{'kernel': (4, 5), 'pad': (0, 0), 'stride': (1, 1), 'p_value': 1, 'pool_type': 'lp'}, False,
{'modify': {'kernel': 'kernel_shape', 'pad': 'pads', 'stride': 'strides', 'p_value': 'p'},
'remove': ['pool_type']}, True, False),
("test_lppool2", mx.sym.Pooling, "LpPool", [get_rnd((2, 3, 20, 20))],
{'kernel': (4, 5), 'pad': (0, 0), 'stride': (1, 1), 'p_value': 2, 'pool_type': 'lp'}, False,
{'modify': {'kernel': 'kernel_shape', 'pad': 'pads', 'stride': 'strides', 'p_value': 'p'},
'remove': ['pool_type']}, True, False),
("test_globallppool1", mx.sym.Pooling, "GlobalLpPool", [get_rnd((2, 3, 20, 20))],
{'kernel': (4, 5), 'pad': (0, 0), 'stride': (1, 1), 'p_value': 1, 'pool_type': 'lp', 'global_pool': True}, False,
{'modify': {'p_value': 'p'},
'remove': ['pool_type', 'kernel', 'pad', 'stride', 'global_pool']}, True, False),
("test_globallppool2", mx.sym.Pooling, "GlobalLpPool", [get_rnd((2, 3, 20, 20))],
{'kernel': (4, 5), 'pad': (0, 0), 'stride': (1, 1), 'p_value': 2, 'pool_type': 'lp', 'global_pool': True}, False,
{'modify': {'p_value': 'p'},
'remove': ['pool_type', 'kernel', 'pad', 'stride', 'global_pool']}, True, False),
("test_roipool", mx.sym.ROIPooling, "MaxRoiPool",
[[[get_rnd(shape=(8, 6), low=1, high=100, dtype=np.int32)]], [[0, 0, 0, 4, 4]]],
{'pooled_size': (2, 2), 'spatial_scale': 0.7}, False,
{'modify': {'pooled_size': 'pooled_shape'}}, True, False),
# since results would be random, checking for shape alone
("test_multinomial", mx.sym.sample_multinomial, "Multinomial",
[np.array([0, 0.1, 0.2, 0.3, 0.4]).astype("float32")],
{'shape': (10,)}, False, {'modify': {'shape': 'sample_size'}}, False, True),
("test_random_normal", mx.sym.random_normal, "RandomNormal", [],
{'shape': (2, 2), 'loc': 0, 'scale': 1}, False, {'modify': {'loc': 'mean'}}, False, True),
("test_random_uniform", mx.sym.random_uniform, "RandomUniform", [],
{'shape': (2, 2), 'low': 0.5, 'high': 1.0}, False, {}, False, True)
]
test_scalar_ops = ['Add', 'Sub', 'rSub' 'Mul', 'Div', 'Pow']
# test_case = ("test_case_name", "ONNX_op_name", [input_list], np_op, attribute map)
import_test_cases = [
("test_lpnormalization_default", "LpNormalization", [get_rnd([5, 3, 3, 2])], np.linalg.norm, {'ord':2, 'axis':-1}),
("test_lpnormalization_ord1", "LpNormalization", [get_rnd([5, 3, 3, 2])], np.linalg.norm, {'ord':1, 'axis':-1}),
("test_lpnormalization_ord2", "LpNormalization", [get_rnd([5, 3, 3, 2])], np.linalg.norm, {'ord':2, 'axis':1})
]
# test_case = ("test_case_name", "ONNX_op_name", mxnet_op, attribute map)
export_test_cases = [
("test_expand", "Expand", mx.sym.broadcast_to, {'shape': (2,1,3,1)}),
("test_tile", "Tile", mx.sym.tile, {'reps': (2,3)})
]
if __name__ == '__main__':
unittest.main()