blob: 294e107632201f2765051502a02f31829287dd38 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Some of the tests using CUDNN require a special GPU instruction called dp4a.
Ref: http://images.nvidia.com/content/pdf/tesla/184457-Tesla-P4-Datasheet-NV-Final-Letter-Web.pdf
"""
import os
import mxnet as mx
import numpy as np
from mxnet.test_utils import assert_almost_equal, assert_exception, rand_ndarray, rand_shape_nd, same, DummyIter
from common import with_seed
from mxnet.module import Module
from mxnet.io import NDArrayIter
import unittest
import operator
def is_test_for_gpu():
return mx.current_context().device_type == 'gpu'
def is_test_for_mkldnn():
return (mx.current_context().device_type == 'cpu'
and os.environ.get('ENABLE_MKLDNN_QUANTIZATION_TEST') == '1')
def is_test_for_native_cpu():
return (mx.current_context().device_type == 'cpu'
and os.environ.get('ENABLE_MKLDNN_QUANTIZATION_TEST') == None)
@with_seed()
def test_quantize_float32_to_int8():
shape = rand_shape_nd(4)
data = rand_ndarray(shape, 'default', dtype='float32')
min_range = mx.nd.min(data)
max_range = mx.nd.max(data)
qdata, min_val, max_val = mx.nd.contrib.quantize(data, min_range, max_range, out_type='int8')
data_np = data.asnumpy()
min_range = min_range.asscalar()
max_range = max_range.asscalar()
real_range = np.maximum(np.abs(min_range), np.abs(max_range))
quantized_range = 127.0
scale = quantized_range / real_range
assert qdata.dtype == np.int8
assert min_val.dtype == np.float32
assert max_val.dtype == np.float32
assert same(min_val.asscalar(), -real_range)
assert same(max_val.asscalar(), real_range)
qdata_np = (np.sign(data_np) * np.minimum(np.abs(data_np) * scale + 0.5, quantized_range)).astype(np.int8)
assert_almost_equal(qdata.asnumpy(), qdata_np, atol = 1)
@with_seed()
def test_dequantize_int8_to_float32():
def get_test_data(real_range, qdata_np):
qdata = mx.nd.array(qdata_np, dtype=np.int8)
min_range = mx.nd.array([-real_range], dtype=np.float32)
max_range = mx.nd.array([real_range], dtype=np.float32)
return qdata, min_range, max_range
def baseline_dequantization(qdata, real_range, qdata_np):
quantized_range = 127.0
scale = real_range / quantized_range
data_np = qdata_np * scale
return data_np
def test_nd_array_dequantization(qdata, min_range, max_range, expected_result):
data = mx.nd.contrib.dequantize(qdata, min_range, max_range, out_type='float32')
assert data.dtype == np.float32
assert_almost_equal(data.asnumpy(), expected_result)
def test_symbolic_api_dequantization(qdata, min_range, max_range, expected_result):
sym_data = mx.sym.Variable('data')
sym_min_range = mx.sym.Variable('min_range')
sym_max_range = mx.sym.Variable('max_range')
dequant = mx.sym.contrib.dequantize(sym_data, sym_min_range,
sym_max_range, out_type='float32')
out = dequant.bind(ctx=mx.current_context(),
args={'data':qdata, 'min_range':min_range, 'max_range':max_range})
data = out.forward()[0]
assert data.dtype == np.float32
assert_almost_equal(data.asnumpy(), expected_result)
real_range = 402.3347
shape = rand_shape_nd(4)
qdata_np = np.random.uniform(low=-127, high=127, size=shape).astype(dtype=np.int8)
qdata, min_range, max_range = get_test_data(real_range, qdata_np)
expected_result = baseline_dequantization(qdata, real_range, qdata_np)
# test nd array implementation.
test_nd_array_dequantization(qdata, min_range, max_range, expected_result)
# test symbolic api implementaion.
test_symbolic_api_dequantization(qdata, min_range, max_range, expected_result)
@with_seed()
def test_requantize_int32_to_int8():
def quantized_int32_to_float(qdata, min_range, max_range):
assert qdata.dtype == 'int32'
quantized_range = np.iinfo('int32').max
real_range = np.maximum(np.abs(min_range), np.abs(max_range))
scale = float(real_range) / float(quantized_range)
return qdata.astype('float32') * scale
def float_to_quantized_int8(data, min_range, max_range):
assert data.dtype == 'float32'
real_range = np.maximum(np.abs(min_range), np.abs(max_range))
quantized_range = np.iinfo('int8').max
scale = float(quantized_range) / float(real_range)
return (np.sign(data) * np.minimum(np.abs(data) * scale + 0.5, quantized_range)).astype('int8')
def requantize(qdata, min_data, max_data, real_range):
data = quantized_int32_to_float(qdata, min_data, max_data)
output = float_to_quantized_int8(data, -real_range, real_range)
return output, -real_range, real_range
def requantize_baseline(qdata, min_data, max_data, min_calib_range=None, max_calib_range=None):
if min_calib_range is not None and max_calib_range is not None:
real_range = np.maximum(np.abs(min_calib_range), np.abs(max_calib_range))
return requantize(qdata, min_data, max_data, real_range)
else:
min_range = quantized_int32_to_float(np.min(qdata), min_data, max_data)
max_range = quantized_int32_to_float(np.max(qdata), min_data, max_data)
return requantize(qdata, min_data, max_data, np.maximum(np.abs(min_range), np.abs(max_range)))
def check_requantize(shape, min_calib_range=None, max_calib_range=None):
qdata = mx.nd.random.uniform(low=-1000.0, high=1000.0, shape=shape).astype('int32')
min_range = mx.nd.array([-1010.0])
max_range = mx.nd.array([1020.0])
if min_calib_range is None or max_calib_range is None:
qdata_int8, min_output, max_output = mx.nd.contrib.requantize(qdata, min_range, max_range)
else:
qdata_int8, min_output, max_output = mx.nd.contrib.requantize(qdata, min_range, max_range,
min_calib_range=min_calib_range,
max_calib_range=max_calib_range)
qdata_int8_np, min_output_np, max_output_np = requantize_baseline(qdata.asnumpy(), min_range.asscalar(),
max_range.asscalar(),
min_calib_range=min_calib_range,
max_calib_range=max_calib_range)
assert_almost_equal(qdata_int8.asnumpy(), qdata_int8_np, atol = 1)
assert_almost_equal(min_output.asnumpy(), np.array([min_output_np]))
assert_almost_equal(max_output.asnumpy(), np.array([max_output_np]))
def check_requantize_with_symbol(shape, min_calib_range=None, max_calib_range=None):
qdata = mx.nd.random.uniform(low=-1000.0, high=1000.0, shape=shape).astype('int32')
min_range = mx.nd.array([-1010.0])
max_range = mx.nd.array([1020.0])
sym_data = mx.sym.Variable('data')
sym_min_range = mx.sym.Variable('min_range')
sym_max_range = mx.sym.Variable('max_range')
if min_calib_range is None or max_calib_range is None:
requant = mx.sym.contrib.requantize(sym_data, sym_min_range, sym_max_range)
out = requant.bind(ctx=mx.current_context(),
args={'data':qdata, 'min_range':min_range,
'max_range':max_range})
qdata_int8, min_output, max_output = out.forward()
else:
requant = mx.sym.contrib.requantize(sym_data, sym_min_range, sym_max_range,
min_calib_range=min_calib_range,
max_calib_range=max_calib_range)
out = requant.bind(ctx=mx.current_context(), args={'data':qdata, 'min_range':min_range,
'max_range':max_range})
qdata_int8, min_output, max_output = out.forward()
qdata_int8_np, min_output_np, max_output_np = requantize_baseline(qdata.asnumpy(), min_range.asscalar(),
max_range.asscalar(),
min_calib_range=min_calib_range,
max_calib_range=max_calib_range)
assert_almost_equal(qdata_int8.asnumpy(), qdata_int8_np)
assert_almost_equal(min_output.asnumpy(), np.array([min_output_np]))
assert_almost_equal(max_output.asnumpy(), np.array([max_output_np]))
# test with symbol API.
check_requantize_with_symbol((3, 4, 10, 10))
check_requantize_with_symbol((32, 3, 23, 23))
check_requantize_with_symbol((3, 4, 10, 10), min_calib_range=-1050.0, max_calib_range=1040.0)
check_requantize_with_symbol((32, 3, 23, 23), min_calib_range=-134.349, max_calib_range=523.43)
# Test with nd array API
check_requantize((3, 4, 10, 10))
check_requantize((32, 3, 23, 23))
check_requantize((3, 4, 10, 10), min_calib_range=-1050.0, max_calib_range=1040.0)
check_requantize((32, 3, 23, 23), min_calib_range=-134.349, max_calib_range=523.43)
@with_seed()
def test_quantized_conv():
def check_quantized_conv(data_shape, kernel, num_filter, pad, stride, no_bias, qdtype):
if is_test_for_native_cpu():
print('skipped testing quantized_conv for native cpu since it is not supported yet')
return
elif qdtype == 'int8' and is_test_for_mkldnn():
print('skipped testing quantized_conv for mkldnn cpu int8 since it is not supported yet')
return
elif qdtype == 'uint8' and is_test_for_gpu():
print('skipped testing quantized_conv for gpu uint8 since it is not supported yet')
return
# run fp32 conv
data = mx.sym.Variable(name='data', shape=data_shape, dtype='float32')
conv2d = mx.sym.Convolution(data=data, kernel=kernel, num_filter=num_filter, pad=pad, stride=stride,
no_bias=no_bias, cudnn_off=False, name='conv2d')
arg_shapes, _, _ = conv2d.infer_shape(data=data_shape)
arg_names = conv2d.list_arguments()
conv_exe_fp32 = conv2d.simple_bind(ctx=mx.current_context(), grad_req='null')
if qdtype == 'uint8':
data_low = 0.0
data_high = 127.0
else:
data_low = -127.0
data_high = 127.0
conv_exe_fp32.arg_dict[arg_names[0]][:] = mx.nd.random.uniform(low=data_low, high=data_high,
shape=data_shape).astype('int32')
conv_exe_fp32.arg_dict[arg_names[1]][:] = mx.nd.random.uniform(low=-127.0, high=127.0,
shape=arg_shapes[1]).astype('int32')
if not no_bias:
conv_exe_fp32.arg_dict[arg_names[2]][:] = mx.nd.random.uniform(low=-127.0, high=127.0,
shape=arg_shapes[2]).astype('int32')
output = conv_exe_fp32.forward()[0]
# run quantized conv
qdata = mx.sym.Variable(name='qdata', shape=data_shape, dtype=qdtype)
qweight = mx.sym.Variable(name='qweight', dtype='int8')
min_data = mx.sym.Variable(name='min_data')
max_data = mx.sym.Variable(name='max_data')
min_weight = mx.sym.Variable(name='min_weight')
max_weight = mx.sym.Variable(name='max_weight')
quantized_conv2d = mx.sym.contrib.quantized_conv(data=qdata, weight=qweight, min_data=min_data,
max_data=max_data, min_weight=min_weight,
max_weight=max_weight, kernel=kernel,
num_filter=num_filter, pad=pad, stride=stride,
no_bias=no_bias)
qarg_names = quantized_conv2d.list_arguments()
type_dict = None
if not no_bias:
type_dict = {qarg_names[2]: 'int8'}
conv_exe_int8 = quantized_conv2d.simple_bind(ctx=mx.current_context(), type_dict=type_dict, grad_req='null')
conv_exe_int8.arg_dict[qarg_names[0]][:] = conv_exe_fp32.arg_dict[arg_names[0]].astype(qdtype)
conv_exe_int8.arg_dict[qarg_names[1]][:] = conv_exe_fp32.arg_dict[arg_names[1]].astype('int8')
quantized_range = 127.0
if no_bias:
conv_exe_int8.arg_dict[qarg_names[2]][:] = -quantized_range
conv_exe_int8.arg_dict[qarg_names[3]][:] = quantized_range
conv_exe_int8.arg_dict[qarg_names[4]][:] = -quantized_range
conv_exe_int8.arg_dict[qarg_names[5]][:] = quantized_range
else:
conv_exe_int8.arg_dict[qarg_names[2]][:] = conv_exe_fp32.arg_dict[arg_names[2]].astype('int8')
conv_exe_int8.arg_dict[qarg_names[3]][:] = -quantized_range
conv_exe_int8.arg_dict[qarg_names[4]][:] = quantized_range
conv_exe_int8.arg_dict[qarg_names[5]][:] = -quantized_range
conv_exe_int8.arg_dict[qarg_names[6]][:] = quantized_range
conv_exe_int8.arg_dict[qarg_names[7]][:] = -quantized_range
conv_exe_int8.arg_dict[qarg_names[8]][:] = quantized_range
qoutput, min_range, max_range = conv_exe_int8.forward()
if no_bias:
assert_almost_equal(output.asnumpy(), qoutput.asnumpy())
else:
# with adding bias, accuracy loss should not be greater than one
diff = mx.nd.abs(output - qoutput.astype(output.dtype))
cond = mx.nd.lesser(2, diff).sum().asscalar()
assert cond == 0
for qdtype in ['int8', 'uint8']:
check_quantized_conv((3, 4, 28, 28), (3, 3), 128, (1, 1), (1, 1), True, qdtype)
check_quantized_conv((3, 4, 28, 28), (3, 3), 128, (1, 1), (1, 1), False, qdtype)
@with_seed()
def test_quantized_elemwise_add():
def check_quantized_elemwise_add(data_shape, qtype):
if is_test_for_native_cpu():
print('skipped testing quantized_elemwise_add for native cpu since it is not supported yet')
return
elif qtype != 'uint8' and qtype != 'int8':
print('skipped testing quantized_elemwise_add for not supported data type')
return
elif is_test_for_gpu():
print('skipped testing quantized_elemwise_add for gpu since it is not supported yet')
return
dataA = mx.sym.Variable(name='dataA', shape=data_shape, dtype='float32')
dataB = mx.sym.Variable(name='dataB', shape=data_shape, dtype='float32')
elemwise_add_fp32 = mx.sym.elemwise_add(dataA, dataB)
arg_names = elemwise_add_fp32.list_arguments()
elemwise_add_fp32_exe = elemwise_add_fp32.simple_bind(ctx=mx.current_context(), grad_req='null')
if qtype == 'uint8':
data_low = 0.0
data_high = 255.0
else:
data_low = -127.0
data_high = 127.0
dataA_val = mx.nd.random.uniform(low=data_low, high=data_high, shape=data_shape).astype('int32')
dataB_val = mx.nd.random.uniform(low=data_low, high=data_high, shape=data_shape).astype('int32')
elemwise_add_fp32_exe.arg_dict[arg_names[0]][:] = dataA_val
elemwise_add_fp32_exe.arg_dict[arg_names[1]][:] = dataB_val
output = elemwise_add_fp32_exe.forward()[0]
qdataA = mx.sym.Variable(name='qdataA', shape=data_shape, dtype=qtype)
qdataB = mx.sym.Variable(name='qdataB', shape=data_shape, dtype=qtype)
min_dataA = mx.sym.Variable(name='min_dataA')
max_dataA = mx.sym.Variable(name='max_dataA')
min_dataB = mx.sym.Variable(name='min_dataB')
max_dataB = mx.sym.Variable(name='max_dataB')
quantized_elemwise_add = mx.sym.contrib.quantized_elemwise_add(qdataA, qdataB, min_dataA, max_dataA, min_dataB, max_dataB)
elemwise_add_int8_exe = quantized_elemwise_add.simple_bind(ctx=mx.current_context(), grad_req='null')
qarg_names = quantized_elemwise_add.list_arguments()
elemwise_add_int8_exe.arg_dict[qarg_names[0]][:] = elemwise_add_fp32_exe.arg_dict[arg_names[0]].astype(qtype)
elemwise_add_int8_exe.arg_dict[qarg_names[1]][:] = elemwise_add_fp32_exe.arg_dict[arg_names[1]].astype(qtype)
quantized_range = 127.0
elemwise_add_int8_exe.arg_dict[qarg_names[2]][:] = data_low
elemwise_add_int8_exe.arg_dict[qarg_names[3]][:] = data_high
elemwise_add_int8_exe.arg_dict[qarg_names[4]][:] = data_low
elemwise_add_int8_exe.arg_dict[qarg_names[5]][:] = data_high
qoutput, min_range, max_range = elemwise_add_int8_exe.forward()
min_val = min_range.asnumpy().tolist()[0]
max_val = max_range.asnumpy().tolist()[0]
fp32_rslt = output.asnumpy()
int8_rslt = qoutput.asnumpy()*max_val/0x7fffffff
assert_almost_equal(int8_rslt, int8_rslt, atol = 1e-4)
for qtype in ['int8', 'uint8']:
check_quantized_elemwise_add((4, 6), qtype)
check_quantized_elemwise_add((13, 74, 52), qtype)
check_quantized_elemwise_add((3, 4, 56, 56), qtype)
check_quantized_elemwise_add((32, 56, 64, 11), qtype)
@with_seed()
def test_quantized_pooling():
def check_quantized_pooling(data_shape, kernel, pool_type, pad, stride, global_pool, qdtype, convention='valid'):
if is_test_for_native_cpu():
print('skipped testing quantized_pooling for native cpu since it is not supported yet')
return
elif qdtype == 'uint8' and is_test_for_gpu():
print('skipped testing quantized_pooling for gpu uint8 since it is not supported yet')
return
data = mx.sym.Variable(name='data', shape=data_shape, dtype='float32')
pooling_fp32 = mx.sym.Pooling(data=data, kernel=kernel, pad=pad, stride=stride,
pool_type=pool_type, global_pool=global_pool, cudnn_off=False,
pooling_convention=convention)
arg_shapes, _, _ = pooling_fp32.infer_shape(data=data_shape)
arg_names = pooling_fp32.list_arguments()
pooling_fp32_exe = pooling_fp32.simple_bind(ctx=mx.current_context(), grad_req='null')
if qdtype == 'uint8':
data_low = 0.0
data_high = 127.0
else:
data_low = -127.0
data_high = 127.0
pooling_fp32_exe.arg_dict[arg_names[0]][:] = mx.nd.random.uniform(low=data_low, high=data_high,
shape=data_shape).astype('int32')
output = pooling_fp32_exe.forward()[0]
qdata = mx.sym.Variable(name='qdata', shape=data_shape, dtype=qdtype)
min_data = mx.sym.Variable(name='min_data')
max_data = mx.sym.Variable(name='max_data')
quantized_pooling = mx.sym.contrib.quantized_pooling(data=qdata, min_data=min_data,
max_data=max_data, kernel=kernel,
pad=pad, stride=stride, pool_type=pool_type,
global_pool=global_pool,
pooling_convention=convention)
pooling_int8_exe = quantized_pooling.simple_bind(ctx=mx.current_context(), grad_req='null')
qarg_names = quantized_pooling.list_arguments()
pooling_int8_exe.arg_dict[qarg_names[0]][:] = pooling_fp32_exe.arg_dict[arg_names[0]].astype(qdtype)
quantized_range = 127.0
pooling_int8_exe.arg_dict[qarg_names[1]][:] = -quantized_range
pooling_int8_exe.arg_dict[qarg_names[2]][:] = quantized_range
qoutput, min_range, max_range = pooling_int8_exe.forward()
if pool_type == 'max':
assert_almost_equal(output.asnumpy(), qoutput.asnumpy())
elif pool_type == 'avg': # for avg pooling, fp32 and int8 may be different due to rounding errors
diff = mx.nd.abs(output - qoutput.astype(output.dtype))
cond = mx.nd.lesser(2, diff).sum().asscalar()
assert cond == 0
for qdtype in ['int8', 'uint8']:
check_quantized_pooling((3, 4, 56, 56), (3, 3), 'max', (0, 0), (2, 2), False, qdtype)
check_quantized_pooling((3, 4, 56, 56), (3, 3), 'max', (0, 0), (2, 2), True, qdtype)
check_quantized_pooling((3, 512, 7, 7), (7, 7), 'avg', (0, 0), (1, 1), False, qdtype)
check_quantized_pooling((3, 512, 7, 7), (7, 7), 'avg', (0, 0), (1, 1), True, qdtype)
check_quantized_pooling((3, 4, 56, 56), (3, 3), 'max', (0, 0), (2, 2), False, qdtype, 'full')
check_quantized_pooling((3, 4, 56, 56), (3, 3), 'max', (0, 0), (2, 2), True, qdtype, 'full')
check_quantized_pooling((3, 512, 7, 7), (7, 7), 'avg', (0, 0), (1, 1), False, qdtype, 'full')
check_quantized_pooling((3, 512, 7, 7), (7, 7), 'avg', (0, 0), (1, 1), True, qdtype, 'full')
@with_seed()
def test_quantized_fc():
def check_quantized_fc(data_shape, num_hidden, no_bias, qdtype, flatten=True):
if is_test_for_native_cpu():
hasMKL = False;
for key in os.environ.keys():
if operator.eq(key, "BUILD_TAG"):
if os.environ['BUILD_TAG'].find("MKL") != -1:
hasMKL = True
break
if hasMKL == False:
print('skipped testing quantized_fc on cpu since s8u8s32 is only supported by MKL BLAS library')
return
elif qdtype == 'uint8' and is_test_for_gpu():
print('skipped testing quantized_fc for gpu uint8 since it is not supported yet')
return
def maxabs(a, b):
return mx.nd.maximum(mx.nd.abs(a), mx.nd.abs(b))
data = mx.sym.Variable(name='data', shape=data_shape, dtype='float32')
fc_fp32 = mx.sym.FullyConnected(data=data, num_hidden=num_hidden, no_bias=no_bias, flatten=flatten)
arg_shapes, _, _ = fc_fp32.infer_shape(data=data_shape)
arg_names = fc_fp32.list_arguments()
fc_fp32_exe = fc_fp32.simple_bind(ctx=mx.current_context(), grad_req='null')
int8_range = 127.0
if qdtype == 'uint8':
data_low = 0.0
data_high = 63.0
quantized_range = 255.0
else:
data_low = -63.0
data_high = 63.0
quantized_range = 127.0
data = mx.nd.random.uniform(low=data_low, high=data_high,
shape=data_shape).astype('int32')
weight = mx.nd.random.uniform(low=data_low, high=data_high,
shape=arg_shapes[1]).astype('int32')
fc_fp32_exe.arg_dict[arg_names[0]][:] = data
fc_fp32_exe.arg_dict[arg_names[1]][:] = weight
data_min = mx.nd.min(data).astype('float32')
data_max = mx.nd.max(data).astype('float32')
weight_min = mx.nd.min(weight).astype('float32')
weight_max = mx.nd.max(weight).astype('float32')
data_range = maxabs(data_min, data_max)
weight_range = maxabs(weight_min, weight_max)
if not no_bias:
bias = mx.nd.random.uniform(low=data_low, high=data_high,
shape=arg_shapes[2]).astype('int32')
bias_min = mx.nd.min(bias).astype('float32')
bias_max = mx.nd.max(bias).astype('float32')
bias_range = maxabs(bias_min, bias_max)
bias_scale = int8_range / bias_range
data_scale = quantized_range / data_range
weight_scale = int8_range / weight_range
bias_int32_rescale = data_scale * weight_scale / bias_scale
new_bias = mx.nd.cast(bias, dtype='float32') * bias_int32_rescale
fc_fp32_exe.arg_dict[arg_names[2]][:] = new_bias.astype('int32')
output = fc_fp32_exe.forward()[0]
qdata = mx.sym.Variable(name='qdata', shape=data_shape, dtype=qdtype)
fc_int8 = mx.sym.contrib.quantized_fully_connected(data=qdata, num_hidden=num_hidden,
no_bias=no_bias, flatten=flatten)
qarg_names = fc_int8.list_arguments()
type_dict = {qarg_names[1]: 'int8'}
if not no_bias:
type_dict.update({qarg_names[2]: 'int8'})
fc_int8_exe = fc_int8.simple_bind(ctx=mx.current_context(), type_dict=type_dict, grad_req='null')
fc_int8_exe.arg_dict[qarg_names[0]][:] = fc_fp32_exe.arg_dict[arg_names[0]].astype(qdtype)
fc_int8_exe.arg_dict[qarg_names[1]][:] = fc_fp32_exe.arg_dict[arg_names[1]].astype('int8')
if no_bias:
fc_int8_exe.arg_dict[qarg_names[2]][:] = -data_range
fc_int8_exe.arg_dict[qarg_names[3]][:] = data_range
fc_int8_exe.arg_dict[qarg_names[4]][:] = -weight_range
fc_int8_exe.arg_dict[qarg_names[5]][:] = weight_range
else:
fc_int8_exe.arg_dict[qarg_names[2]][:] = bias.astype('int8')
fc_int8_exe.arg_dict[qarg_names[3]][:] = -data_range
fc_int8_exe.arg_dict[qarg_names[4]][:] = data_range
fc_int8_exe.arg_dict[qarg_names[5]][:] = -weight_range
fc_int8_exe.arg_dict[qarg_names[6]][:] = weight_range
fc_int8_exe.arg_dict[qarg_names[7]][:] = -bias_range
fc_int8_exe.arg_dict[qarg_names[8]][:] = bias_range
qoutput, min_range, max_range = fc_int8_exe.forward()
if no_bias:
assert_almost_equal(output.asnumpy(), qoutput.asnumpy())
else:
# with adding bias, accuracy loss should not be greater than one
diff = mx.nd.abs(output - qoutput.astype(output.dtype))
cond = mx.nd.lesser(2, diff).sum().asscalar()
assert cond == 0
for qdtype in ['int8', 'uint8']:
if is_test_for_mkldnn():
check_quantized_fc((32, 512, 2), 100, True, qdtype, flatten=False)
check_quantized_fc((32, 512, 2), 100, False, qdtype, flatten=False)
check_quantized_fc((32, 512, 2, 2), 100, True, qdtype, flatten=False)
check_quantized_fc((32, 512, 2, 2), 100, False, qdtype, flatten=False)
check_quantized_fc((32, 512, 2, 2), 100, True, qdtype)
check_quantized_fc((32, 111, 2, 2), 100, True, qdtype)
check_quantized_fc((32, 512, 2, 2), 100, False, qdtype)
check_quantized_fc((32, 111, 2, 2), 100, False, qdtype)
check_quantized_fc((256, 2048, 2, 2), 800, False, qdtype)
check_quantized_fc((256, 111, 2, 2), 800, False, qdtype)
check_quantized_fc((256, 2048, 2, 2), 800, True, qdtype)
check_quantized_fc((256, 111, 2, 2), 800, True, qdtype)
@with_seed()
def test_quantized_flatten():
def check_quantized_flatten(shape, qdtype):
if qdtype == 'uint8':
data_low = 0.0
data_high = 127.0
else:
data_low = -127.0
data_high = 127.0
qdata = mx.nd.random.uniform(low=data_low, high=data_high, shape=shape).astype(qdtype)
min_data = mx.nd.array([-1023.343], dtype='float32')
max_data = mx.nd.array([2343.324275], dtype='float32')
qoutput, min_output, max_output = mx.nd.contrib.quantized_flatten(qdata, min_data, max_data)
assert qoutput.ndim == 2
assert qoutput.shape[0] == qdata.shape[0]
assert qoutput.shape[1] == np.prod(qdata.shape[1:])
assert same(qdata.asnumpy().flatten(), qoutput.asnumpy().flatten())
assert same(min_data.asnumpy(), min_output.asnumpy())
assert same(max_data.asnumpy(), max_output.asnumpy())
for qdtype in ['int8', 'uint8']:
check_quantized_flatten((10,), qdtype)
check_quantized_flatten((10, 15), qdtype)
check_quantized_flatten((10, 15, 18), qdtype)
check_quantized_flatten((3, 4, 23, 23), qdtype)
@with_seed()
def test_quantized_act():
def check_quantized_act(data_shape, qdtype):
if is_test_for_native_cpu():
print('skipped testing quantized_act for native cpu since it is not supported yet')
return
elif qdtype == 'int8' and is_test_for_mkldnn():
print('skipped testing quantized_act for mkldnn cpu int8 since it is not supported yet')
return
elif is_test_for_gpu():
print('skipped testing quantized_act for gpu since it is not supported yet')
return
data = mx.sym.Variable(name='data', shape=data_shape, dtype='float32')
act_fp32 = mx.sym.Activation(data=data, act_type='relu', name='relu')
arg_shapes, _, _ = act_fp32.infer_shape(data=data_shape)
arg_names = act_fp32.list_arguments()
act_fp32_exe = act_fp32.simple_bind(ctx=mx.current_context(), grad_req='null')
if qdtype == 'uint8':
data_low = 0.0
data_high = 127.0
else:
data_low = -127.0
data_high = 127.0
act_fp32_exe.arg_dict[arg_names[0]][:] = mx.nd.random.uniform(low=data_low,
high=data_high, shape=data_shape).astype(qdtype)
output = act_fp32_exe.forward()[0]
qdata = mx.sym.Variable(name='qdata', shape=data_shape, dtype=qdtype)
min_data = mx.sym.Variable(name='min_data')
max_data = mx.sym.Variable(name='max_data')
quantized_act = mx.sym.contrib.quantized_act(data=qdata, min_data=min_data, max_data=max_data, act_type='relu')
act_int8_exe = quantized_act.simple_bind(ctx=mx.current_context(), grad_req='null')
qarg_names = quantized_act.list_arguments()
act_int8_exe.arg_dict[qarg_names[0]][:] = act_fp32_exe.arg_dict[arg_names[0]].astype(qdtype)
quantized_range_min = mx.nd.min(act_int8_exe.arg_dict[qarg_names[0]][:])
quantized_range_max = mx.nd.max(act_int8_exe.arg_dict[qarg_names[0]][:])
act_int8_exe.arg_dict[qarg_names[1]][:] = quantized_range_min.astype(qdtype)
act_int8_exe.arg_dict[qarg_names[2]][:] = quantized_range_max.astype(qdtype)
qoutput, min_range, max_range = act_int8_exe.forward()
assert_almost_equal(output.asnumpy(), qoutput.asnumpy())
assert_almost_equal(min_range.asscalar(), quantized_range_min.asscalar())
assert_almost_equal(max_range.asscalar(), quantized_range_max.asscalar())
for qdtype in ['int8', 'uint8']:
check_quantized_act((10,), qdtype)
check_quantized_act((10, 15), qdtype)
check_quantized_act((10, 15, 18), qdtype)
check_quantized_act((3, 4, 23, 23), qdtype)
@with_seed()
def test_quantize_params():
data = mx.sym.Variable('data')
conv = mx.sym.Convolution(data, kernel=(1, 1), num_filter=2048, name='conv')
sym = mx.sym.BatchNorm(data=conv, eps=2e-05, fix_gamma=False, momentum=0.9, use_global_stats=False, name='bn')
offline_params = [name for name in sym.list_arguments()
if not name.startswith('data') and not name.endswith('label')]
params = {}
for name in offline_params:
params[name] = mx.nd.uniform(shape=(2, 2))
qsym = mx.contrib.quant._quantize_symbol(sym, offline_params=offline_params)
qparams = mx.contrib.quant._quantize_params(qsym, params, th_dict = {})
param_names = params.keys()
qparam_names = qparams.keys()
for name in qparam_names:
if name.startswith('bn'):
assert name in param_names
elif name.startswith('conv'):
assert name not in param_names
assert name.find('quantize') != -1
def get_fp32_sym():
data = mx.sym.Variable('data')
conv = mx.sym.Convolution(data, kernel=(1, 1), num_filter=16, name='conv')
bn = mx.sym.BatchNorm(data=conv, eps=2e-05, fix_gamma=False, momentum=0.9, use_global_stats=False, name='bn')
act = mx.sym.Activation(data=bn, act_type='relu', name='relu')
pool = mx.sym.Pooling(act, kernel=(4, 4), pool_type='avg', name='pool')
fc = mx.sym.FullyConnected(pool, num_hidden=10, flatten=True, name='fc')
sym = mx.sym.SoftmaxOutput(fc, grad_scale=1, ignore_label=-1, multi_output=False,
out_grad=False, preserve_shape=False, use_ignore=False, name='softmax')
return sym
def get_fp32_residual():
data = mx.sym.Variable('data')
conv0 = mx.sym.Convolution(data=data, num_filter=4, kernel=(1,1), pad=(0,0),
no_bias=True, name='conv0')
bn = mx.sym.BatchNorm(data=conv0, fix_gamma=False, eps=2e-5, momentum=0.9, name='bn')
sum0 = mx.sym.elemwise_add(bn, data, name='sum0')
act0 = mx.sym.Activation(data=sum0, act_type='relu', name='relu0')
pool0 = mx.sym.Pooling(act0, kernel=(4, 4), pool_type='avg', name='pool0')
conv1 = mx.sym.Convolution(data=pool0, num_filter=4, kernel=(1,1), pad=(0,0),
no_bias=False, name='conv1')
act1 = mx.sym.Activation(data=conv1, act_type='relu', name='relu1')
pool1 = mx.sym.Pooling(act1, kernel=(4, 4), pool_type='avg', name='pool1')
fc = mx.sym.FullyConnected(pool1, num_hidden=10, flatten=True, name='fc')
sym = mx.sym.SoftmaxOutput(fc, grad_scale=1, ignore_label=-1, multi_output=False,
out_grad=False, preserve_shape=False, use_ignore=False, name='softmax')
return sym
def get_fp32_sym_with_multiple_outputs(length=1):
data = mx.sym.Variable('data')
inputs = list(mx.sym.split(data, axis=0, num_outputs=length, squeeze_axis=1, name='split'))
_conv_outs = []
for i in range(length):
_conv_outs.append(mx.sym.Convolution(data=inputs[i], kernel=(1, 1), num_filter=16, name='conv_{0}'.format(i)))
conv_out = [mx.sym.expand_dims(i, axis=0) for i in _conv_outs]
conv_out = mx.sym.Concat(*conv_out, dim=0, name='concat')
reshape_out = mx.sym.reshape(data=conv_out, shape=((length, -1)), name='reshape')
fc_out = mx.sym.FullyConnected(reshape_out, num_hidden=10, flatten=True, name='fc')
sym= mx.sym.SoftmaxOutput(fc_out, grad_scale=1, ignore_label=-1, multi_output=False,
out_grad=False, preserve_shape=False, use_ignore=False, name='softmax')
return sym
@with_seed()
def test_quantize_model():
def check_quantize_model(qdtype):
if is_test_for_native_cpu():
print('skipped testing quantize_model for native cpu since it is not supported yet')
return
elif qdtype == 'int8' and is_test_for_mkldnn():
print('skipped testing quantize_model for mkldnn cpu int8 since it is not supported yet')
return
elif qdtype == 'uint8' and is_test_for_gpu():
print('skipped testing quantize_model for gpu uint8 since it is not supported yet')
return
def check_params(params, qparams, qsym=None):
if qsym is None:
assert len(params) == len(qparams)
for k, v in params.items():
assert k in qparams
assert same(v.asnumpy(), qparams[k].asnumpy())
else:
qparams_ground_truth = mx.contrib.quant._quantize_params(qsym, params, th_dict = {})
assert len(qparams) == len(qparams_ground_truth)
for k, v in qparams_ground_truth.items():
assert k in qparams
assert same(v.asnumpy(), qparams[k].asnumpy())
def check_qsym_calibrated(qsym):
attrs = qsym.attr_dict()
for k, v in attrs.items():
if k.find('requantize_') != -1:
assert 'min_calib_range' in v
assert 'max_calib_range' in v
def check_qsym_qdtype(qsym, qdtype):
attrs = qsym.attr_dict()
for k, v in attrs.items():
if k.find('_quantize') != -1:
assert 'out_type' in v
assert v['out_type'] == qdtype
sym = get_fp32_sym()
batch_size = 4
label_shape = (batch_size, 10)
data_shape = (batch_size, 4, 10, 10)
length = batch_size # specify num of outputs from split op
msym = get_fp32_sym_with_multiple_outputs(length)
msym_label_shape = (length, 10)
msym_data_shape = (length, 4, 4, 10, 10)
for s, dshape, lshape in zip((sym, msym), (data_shape, msym_data_shape),
(label_shape, msym_label_shape)):
mod = Module(symbol=s)
mod.bind(data_shapes=[('data', dshape)], label_shapes=[('softmax_label', lshape)])
mod.init_params()
arg_params, aux_params = mod.get_params()
qsym, qarg_params, qaux_params = mx.contrib.quant.quantize_model(sym=s,
arg_params=arg_params,
aux_params=aux_params,
ctx=mx.current_context(),
quantized_dtype=qdtype,
calib_mode='none')
check_params(arg_params, qarg_params, qsym)
check_params(aux_params, qaux_params)
calib_data = mx.nd.random.uniform(shape=dshape)
calib_data = NDArrayIter(data=calib_data, batch_size=batch_size)
calib_data = DummyIter(calib_data)
qsym, qarg_params, qaux_params = mx.contrib.quant.quantize_model(sym=s,
arg_params=arg_params,
aux_params=aux_params,
ctx=mx.current_context(),
quantized_dtype=qdtype,
calib_mode='naive',
calib_data=calib_data,
num_calib_examples=20)
check_params(arg_params, qarg_params, qsym)
check_params(aux_params, qaux_params)
check_qsym_calibrated(qsym)
check_qsym_qdtype(qsym, qdtype)
for qdtype in ['int8', 'uint8']:
check_quantize_model(qdtype)
@with_seed()
def test_quantize_model_with_forward():
def check_quantize_model(qdtype):
if is_test_for_native_cpu():
print('skipped testing test_quantize_model_with_forward for native cpu since it is not supported yet')
return
elif qdtype == 'uint8' and is_test_for_gpu():
print('skipped testing test_quantize_model_with_forward for gpu uint8 since it is not supported yet')
return
def check_params(params, qparams, qsym=None):
if qsym is None:
assert len(params) == len(qparams)
for k, v in params.items():
assert k in qparams
assert same(v.asnumpy(), qparams[k].asnumpy())
else:
qparams_ground_truth = mx.contrib.quant._quantize_params(qsym, params, th_dict = {})
assert len(qparams) == len(qparams_ground_truth)
for k, v in qparams_ground_truth.items():
assert k in qparams
assert same(v.asnumpy(), qparams[k].asnumpy())
def check_qsym_calibrated(qsym):
attrs = qsym.attr_dict()
for k, v in attrs.items():
if k.find('requantize_') != -1:
assert 'min_calib_range' in v
assert 'max_calib_range' in v
def check_qsym_qdtype(qsym, qdtype):
attrs = qsym.attr_dict()
for k, v in attrs.items():
if k.find('_quantize') != -1:
assert 'out_type' in v
assert v['out_type'] == qdtype
def check_qsym_forward(qsym, qarg_params, qaux_params, data_shape, label_shape=None):
if label_shape is None:
mod = mx.mod.Module(symbol=qsym, label_names=None, context=mx.current_context())
mod.bind(for_training=False,
data_shapes=[('data', data_shape)])
else:
mod = mx.mod.Module(symbol=qsym, context=mx.current_context())
mod.bind(for_training=False,
data_shapes=[('data', data_shape)],
label_shapes=[('softmax_label', label_shape)])
mod.set_params(qarg_params, qaux_params)
data = [mx.random.uniform(-1.0, 1.0, shape=shape) for _, shape in mod.data_shapes]
batch = mx.io.DataBatch(data, [])
mod.forward(batch, is_train=False)
for output in mod.get_outputs():
output.wait_to_read()
batch_size = 4
length = batch_size # specify num of outputs from split op
sym_list = []
name_list = []
dshape_list = []
lshape_list = []
# sym 1
sym_list.append(get_fp32_residual())
name_list.append('sym1')
dshape_list.append((batch_size, 4, 10, 10))
lshape_list.append((batch_size, 10))
# sym 2
sym_list.append(get_fp32_sym_with_multiple_outputs(length))
name_list.append('sym2')
dshape_list.append((length, 4, 4, 10, 10))
lshape_list.append((length, 10))
data = mx.sym.Variable('data')
# sym 3
sym_list.append(mx.sym.Convolution(data, kernel=(1, 1), num_filter=16, name='conv0'))
name_list.append('sym3')
dshape_list.append((batch_size, 4, 10, 10))
lshape_list.append(None)
# sym 4
cell = mx.rnn.LSTMCell(num_hidden=64)
outputs, _ = cell.unroll(length, data)
sym_list.append(mx.sym.Group(outputs))
name_list.append('sym4')
dshape_list.append((batch_size, length, 32))
lshape_list.append(None)
for s, dshape, lshape, name in zip(sym_list, dshape_list, lshape_list, name_list):
if qdtype == 'int8' and is_test_for_mkldnn() and name in ['sym1', 'sym2', 'sym3']:
print('skipped testing test_quantize_model_with_forward for mkldnn cpu int8 since it is not supported yet')
continue
if lshape is None:
mod = Module(symbol=s, label_names=None)
mod.bind(for_training=False,
data_shapes=[('data', dshape)])
else:
mod = Module(symbol=s)
mod.bind(for_training=False,
data_shapes=[('data', dshape)],
label_shapes=[('softmax_label', lshape)])
mod.init_params()
arg_params, aux_params = mod.get_params()
excluded_sym_names = []
# sym3/sym4 doesn't have such layers
if name not in ['sym3', 'sym4']:
excluded_names = []
if mx.current_context() == mx.cpu():
excluded_names += ['fc', 'conv1']
if mx.current_context() == mx.gpu():
excluded_names += ['sum0', 'relu0', 'relu1']
excluded_names += ['concat']
optional_names = ['pool0']
for skip_optional_names in [False, True]:
exclude_sym_names = []
if skip_optional_names:
excluded_sym_names = excluded_names
else:
excluded_sym_names = excluded_names + optional_names
qsym, qarg_params, qaux_params = mx.contrib.quant.quantize_model(sym=s,
arg_params=arg_params,
aux_params=aux_params,
excluded_sym_names=excluded_sym_names,
ctx=mx.current_context(),
quantized_dtype=qdtype,
calib_mode='none')
check_params(arg_params, qarg_params, qsym)
check_params(aux_params, qaux_params)
check_qsym_forward(qsym, qarg_params, qaux_params, dshape, lshape)
calib_data = mx.nd.random.uniform(shape=dshape)
calib_data = NDArrayIter(data=calib_data, batch_size=batch_size)
calib_data = DummyIter(calib_data)
qsym, qarg_params, qaux_params = mx.contrib.quant.quantize_model(sym=s,
arg_params=arg_params,
aux_params=aux_params,
excluded_sym_names=excluded_sym_names,
ctx=mx.current_context(),
quantized_dtype=qdtype,
calib_mode='naive',
calib_data=calib_data,
num_calib_examples=20)
check_params(arg_params, qarg_params, qsym)
check_params(aux_params, qaux_params)
check_qsym_calibrated(qsym)
check_qsym_qdtype(qsym, qdtype)
check_qsym_forward(qsym, qarg_params, qaux_params, dshape, lshape)
for qdtype in ['int8', 'uint8']:
check_quantize_model(qdtype)
@with_seed()
def test_quantize_sym_with_calib():
sym = get_fp32_sym()
offline_params = [name for name in sym.list_arguments()
if not name.startswith('data') and not name.endswith('label')]
qsym = mx.contrib.quant._quantize_symbol(sym, offline_params=offline_params)
requantize_op_names = ['requantize_conv', 'requantize_fc']
th_dict = {'conv_output': (np.random.uniform(low=100.0, high=200.0), np.random.uniform(low=100.0, high=200.0)),
'fc_output': (np.random.uniform(low=100.0, high=200.0), np.random.uniform(low=100.0, high=200.0))}
op_name_to_th_name = {'requantize_conv': 'conv_output', 'requantize_fc': 'fc_output'}
cqsym = mx.contrib.quant._calibrate_quantized_sym(qsym, th_dict)
attr_dict = cqsym.attr_dict()
for name in requantize_op_names:
assert name in attr_dict
lhs = float(attr_dict[name]['min_calib_range'])
rhs = th_dict[op_name_to_th_name[name]][0]
assert_almost_equal(np.array([lhs]), np.array([rhs]))
lhs = float(attr_dict[name]['max_calib_range'])
rhs = th_dict[op_name_to_th_name[name]][1]
assert_almost_equal(np.array([lhs]), np.array([rhs]), rtol=1e-3, atol=1e-4)
@with_seed()
def test_smooth_distribution():
assert_exception(lambda: mx.contrib.quant._smooth_distribution(np.zeros((2,)), eps=1e-3), ValueError)
dirac_delta = np.zeros((5,))
dirac_delta[2] = 1
smooth_dirac_delta = dirac_delta.copy()
smooth_dirac_delta += 1e-3
smooth_dirac_delta[2] -= 5e-3
assert_almost_equal(mx.contrib.quant._smooth_distribution(dirac_delta, eps=1e-3), smooth_dirac_delta)
@with_seed()
def test_optimal_threshold_adversarial_case():
# The worst case for the optimal_threshold function is when the values are concentrated
# at one edge: [0, 0, ..., 1000]. (histogram)
# We want to make sure that the optimal threshold in this case is the max.
arr = np.array([2] * 1000)
for dtype in ['uint8', 'int8', 'auto']:
res = mx.contrib.quant._get_optimal_threshold(arr, dtype, num_quantized_bins=5)
# The threshold should be 2.
assert res[3] - 2 < 1e-5
@with_seed()
def test_get_optimal_thresholds():
# Given an ndarray with elements following a uniform distribution, the optimal threshold
# for quantizing the ndarray should be either abs(min(nd)) or abs(max(nd)).
def get_threshold(nd):
min_nd = mx.nd.min(nd)
max_nd = mx.nd.max(nd)
return mx.nd.maximum(mx.nd.abs(min_nd), mx.nd.abs(max_nd)).asnumpy()
for dtype in ['uint8', 'int8', 'auto']:
nd_dict = {'layer1': mx.nd.uniform(low=-10.532, high=11.3432, shape=(8, 3, 23, 23), dtype=np.float64)}
expected_threshold = get_threshold(nd_dict['layer1'])
th_dict = mx.contrib.quant._get_optimal_thresholds(nd_dict, dtype)
assert 'layer1' in th_dict
assert_almost_equal(np.array([th_dict['layer1'][1]]), expected_threshold, rtol=1e-2, atol=1e-4)
if __name__ == "__main__":
import nose
nose.runmodule()