blob: fd5e3d802629e2db4257a63b7c22b58d8e3f8a34 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Some of the tests using CUDNN require a special GPU instruction called dp4a.
Ref: http://images.nvidia.com/content/pdf/tesla/184457-Tesla-P4-Datasheet-NV-Final-Letter-Web.pdf
"""
import os
import mxnet as mx
import numpy as onp
from mxnet import npx
from mxnet.util import use_np
from mxnet.gluon.model_zoo import vision
from mxnet.test_utils import assert_almost_equal, assert_exception, rand_ndarray, rand_shape_nd, same, DummyIter
from common import xfail_when_nonstandard_decimal_separator
from mxnet.io import NDArrayIter
import unittest
import operator
npx.reset_np()
def collect_block_args_aux(block, sym):
arg_params, aux_params = dict(), dict()
for k, v in block.collect_params().items():
if k in sym.list_arguments():
arg_params[k]= v._reduce()
elif k in sym.list_auxiliary_states():
aux_params[k]= v._reduce()
return arg_params, aux_params
def is_test_for_gpu():
return mx.current_device().device_type == 'gpu'
def is_test_for_dnnl():
return (mx.current_device().device_type == 'cpu'
and os.environ.get('ENABLE_ONEDNN_QUANTIZATION_TEST') == '1')
def is_test_for_native_cpu():
return (mx.current_device().device_type == 'cpu'
and os.environ.get('ENABLE_ONEDNN_QUANTIZATION_TEST') == None)
def get_low_high(qtype):
""" Return low and high value for given integer type as float number"""
if qtype == 'uint8':
return 0.0, 255.0
else:
return -128.0, 127.0
def test_quantize_float32_to_int8():
shape = rand_shape_nd(4)
data = rand_ndarray(shape, 'default', dtype='float32')
min_range = mx.nd.min(data)
max_range = mx.nd.max(data)
qdata, min_val, max_val = mx.nd.contrib.quantize(data, min_range, max_range, out_type='int8')
data_np = data.asnumpy()
min_range = min_range.asscalar()
max_range = max_range.asscalar()
real_range = onp.maximum(onp.abs(min_range), onp.abs(max_range))
quantized_range = 127.0
scale = quantized_range / real_range
assert qdata.dtype == onp.int8
assert min_val.dtype == onp.float32
assert max_val.dtype == onp.float32
assert same(min_val.asscalar(), -real_range)
assert same(max_val.asscalar(), real_range)
qdata_np = (onp.sign(data_np) * onp.minimum(onp.abs(data_np) * scale + 0.5, quantized_range)).astype(onp.int8)
assert_almost_equal(qdata.asnumpy(), qdata_np, atol = 1)
def test_calibrated_quantize_v2_bfloat16_to_int8():
shape = rand_shape_nd(4)
data = mx.nd.random.normal(0, 1, shape).astype('bfloat16')
min_range = mx.nd.min(data).asscalar()
max_range = mx.nd.max(data).asscalar()
qdata, min_val, max_val = mx.nd.contrib.quantize_v2(data, 'int8', min_range, max_range)
data_np = data.asnumpy()
real_range = onp.maximum(onp.abs(min_range), onp.abs(max_range))
quantized_range = 127.0
scale = quantized_range / real_range
assert qdata.dtype == onp.int8
assert min_val.dtype == onp.float32
assert max_val.dtype == onp.float32
assert same(min_val.asscalar(), -real_range)
assert same(max_val.asscalar(), real_range)
qdata_np = (onp.sign(data_np) * onp.minimum(onp.abs(data_np) * scale + 0.5, quantized_range)).astype(onp.int8)
assert_almost_equal(qdata.asnumpy(), qdata_np, atol=1)
def test_dequantize_int8_to_float32():
def get_test_data(real_range, qdata_np):
qdata = mx.nd.array(qdata_np, dtype=onp.int8)
min_range = mx.nd.array([-real_range], dtype=onp.float32)
max_range = mx.nd.array([real_range], dtype=onp.float32)
return qdata, min_range, max_range
def baseline_dequantization(qdata, real_range, qdata_np):
quantized_range = 127.0
scale = real_range / quantized_range
data_np = qdata_np * scale
return data_np
def test_nd_array_dequantization(qdata, min_range, max_range, expected_result):
data = mx.nd.contrib.dequantize(qdata, min_range, max_range, out_type='float32')
assert data.dtype == onp.float32
assert_almost_equal(data.asnumpy(), expected_result, atol = 1)
def test_symbolic_api_dequantization(qdata, min_range, max_range, expected_result):
sym_data = mx.sym.Variable('data')
sym_min_range = mx.sym.Variable('min_range')
sym_max_range = mx.sym.Variable('max_range')
dequant = mx.sym.contrib.dequantize(sym_data, sym_min_range,
sym_max_range, out_type='float32')
out = dequant._bind(ctx=mx.current_device(),
args={'data':qdata, 'min_range':min_range, 'max_range':max_range})
data = out.forward()[0]
assert data.dtype == onp.float32
assert_almost_equal(data.asnumpy(), expected_result, atol = 1)
real_range = 128
shape = rand_shape_nd(4)
qdata_np = onp.random.uniform(low=-127, high=127, size=shape).astype(dtype=onp.int8)
qdata, min_range, max_range = get_test_data(real_range, qdata_np)
expected_result = baseline_dequantization(qdata, real_range, qdata_np)
# test nd array implementation.
test_nd_array_dequantization(qdata, min_range, max_range, expected_result)
# test symbolic api implementaion.
test_symbolic_api_dequantization(qdata, min_range, max_range, expected_result)
def test_requantize_int32_to_int8():
def quantized_int32_to_float(qdata, min_range, max_range):
assert qdata.dtype == 'int32'
quantized_range = onp.iinfo('int32').max
real_range = onp.maximum(onp.abs(min_range), onp.abs(max_range))
scale = float(real_range) / float(quantized_range)
return qdata.astype('float32') * scale
def float_to_quantized_int8(data, min_range, max_range):
assert data.dtype == 'float32'
real_range = onp.maximum(onp.abs(min_range), onp.abs(max_range))
quantized_range = onp.iinfo('int8').max
scale = float(quantized_range) / float(real_range)
return (onp.sign(data) * onp.minimum(onp.abs(data) * scale + 0.5, quantized_range)).astype('int8')
def requantize(qdata, min_data, max_data, real_range):
data = quantized_int32_to_float(qdata, min_data, max_data)
output = float_to_quantized_int8(data, -real_range, real_range)
return output, -real_range, real_range
def requantize_baseline(qdata, min_data, max_data, min_calib_range=None, max_calib_range=None):
if min_calib_range is not None and max_calib_range is not None:
real_range = onp.maximum(onp.abs(min_calib_range), onp.abs(max_calib_range))
return requantize(qdata, min_data, max_data, real_range)
else:
min_range = quantized_int32_to_float(onp.min(qdata), min_data, max_data)
max_range = quantized_int32_to_float(onp.max(qdata), min_data, max_data)
return requantize(qdata, min_data, max_data, onp.maximum(onp.abs(min_range), onp.abs(max_range)))
def check_requantize(shape, min_calib_range=None, max_calib_range=None):
qdata = mx.nd.random.uniform(low=-1000.0, high=1000.0, shape=shape).astype('int32')
min_range = mx.nd.array([-1010.0])
max_range = mx.nd.array([1020.0])
if min_calib_range is None or max_calib_range is None:
qdata_int8, min_output, max_output = mx.nd.contrib.requantize(qdata, min_range, max_range)
else:
qdata_int8, min_output, max_output = mx.nd.contrib.requantize(qdata, min_range, max_range,
min_calib_range=min_calib_range,
max_calib_range=max_calib_range)
qdata_int8_np, min_output_np, max_output_np = requantize_baseline(qdata.asnumpy(), min_range.asscalar(),
max_range.asscalar(),
min_calib_range=min_calib_range,
max_calib_range=max_calib_range)
assert_almost_equal(qdata_int8.asnumpy(), qdata_int8_np, atol = 1)
assert_almost_equal(min_output.asnumpy(), onp.array([min_output_np]))
assert_almost_equal(max_output.asnumpy(), onp.array([max_output_np]))
@use_np
def check_requantize_with_gluon(shape, min_calib_range=None, max_calib_range=None):
qdata = mx.np.random.uniform(low=-1000.0, high=1000.0, size=shape).astype('int32')
min_range = mx.np.array([-1010.0])
max_range = mx.np.array([1020.0])
class RequantizeBlock(mx.gluon.nn.HybridBlock):
def __init__(self, min_calib_range=None, max_calib_range=None, **kwargs):
super(RequantizeBlock, self).__init__(**kwargs)
self.min_calib_range = min_calib_range
self.max_calib_range = max_calib_range
def forward(self, x, min_range, max_range):
if self.min_calib_range is not None and self.max_calib_range is not None:
out = npx.requantize(x, min_range, max_range,
min_calib_range=self.min_calib_range,
max_calib_range=self.max_calib_range)
else:
out = npx.requantize(x, min_range, max_range)
return out
requant = RequantizeBlock(min_calib_range, max_calib_range) # m*_calib_ranges can be None
qdata_int8, min_output, max_output = requant(qdata, min_range, max_range)
qdata_int8_np, min_output_np, max_output_np = requantize_baseline(qdata.asnumpy(), min_range.item(),
max_range.item(),
min_calib_range=min_calib_range,
max_calib_range=max_calib_range)
assert_almost_equal(qdata_int8.asnumpy(), qdata_int8_np, atol = 1)
assert_almost_equal(min_output.asnumpy(), onp.array([min_output_np]))
assert_almost_equal(max_output.asnumpy(), onp.array([max_output_np]))
# test with gluon API.
check_requantize_with_gluon((3, 4, 10, 10))
check_requantize_with_gluon((32, 3, 23, 23))
check_requantize_with_gluon((3, 4, 10, 10), min_calib_range=-1050.0, max_calib_range=1040.0)
check_requantize_with_gluon((32, 3, 23, 23), min_calib_range=-134.349, max_calib_range=523.43)
# Test with nd array API
check_requantize((3, 4, 10, 10))
check_requantize((32, 3, 23, 23))
check_requantize((3, 4, 10, 10), min_calib_range=-1050.0, max_calib_range=1040.0)
check_requantize((32, 3, 23, 23), min_calib_range=-134.349, max_calib_range=523.43)
@use_np
def test_quantized_conv():
def check_quantized_conv(data_shape, kernel, num_filter, pad, stride, dilate, use_bias, qdtype):
if is_test_for_native_cpu():
print('skipped testing quantized_conv for native cpu since it is not supported yet')
return
elif is_test_for_dnnl():
# (TODO)Xinyu: https://github.com/apache/mxnet/issues/16830
print('skipped testing quantized_conv for oneDNN cpu since it is a flaky case')
return
elif qdtype == 'uint8' and is_test_for_gpu():
print('skipped testing quantized_conv for gpu uint8 since it is not supported yet')
return
elif is_test_for_gpu() and len(data_shape) != 4:
print('skipped testing quantized_conv for gpu 5d layout since it is not supported yet')
return
# run fp32 conv
if len(data_shape) == 4:
convfp32 = mx.gluon.nn.Conv2D(channels=num_filter, kernel_size=kernel, strides=stride,
padding=pad, dilation=dilate, use_bias=use_bias)
elif len(data_shape) == 5:
convfp32 = mx.gluon.nn.Conv3D(channels=num_filter, kernel_size=kernel, strides=stride,
padding=pad, dilation=dilate, use_bias=use_bias)
else:
print('unsupported shape')
assert False
if qdtype == 'uint8':
data_low = 0.0
data_high = 127.0
else:
data_low = -127.0
data_high = 127.0
convfp32.initialize()
input_data = mx.np.random.uniform(low=data_low,
high=data_high,
size=data_shape
).astype('int32').astype('float32')
convfp32(input_data) # initialize params
npx.waitall()
fp32_params = convfp32.collect_params()
weight_shape = fp32_params['weight'].shape
new_args = dict()
new_args['weight'] = mx.np.random.uniform(low=-127.0,
high=127.0,
size=weight_shape
).astype('int32').astype('float32')
if use_bias:
new_args['bias'] = mx.np.random.uniform(low=-127.0,
high=127.0,
size=fp32_params['bias'].shape
).astype('int32').astype('float32')
convfp32.load_dict(new_args, cast_dtype=True, dtype_source='saved')
output = convfp32(input_data)
# run quantized conv
class QuantConv(mx.gluon.nn.HybridBlock):
def __init__(self, channels, kernel_size, strides=(1, 1),
padding=(0, 0), dilation=(1, 1), use_bias=True, **kwargs):
super(QuantConv, self).__init__(**kwargs)
self.use_bias = use_bias
self._kwargs = {'kernel': kernel_size, 'stride': strides, 'dilate': dilation,
'pad': padding, 'num_filter': channels, 'no_bias': not use_bias, 'num_group': 1,
'layout': 'NCHW'}
self.min_data = mx.gluon.Parameter('min_data', dtype='float32', shape=(1), allow_deferred_init=True)
self.max_data = mx.gluon.Parameter('max_data', dtype='float32', shape=(1), allow_deferred_init=True)
self.weight = mx.gluon.Parameter('weight', dtype='int8', shape=weight_shape, allow_deferred_init=True)
self.min_weight = mx.gluon.Parameter('min_weight', dtype='float32', shape=(1), allow_deferred_init=True)
self.max_weight = mx.gluon.Parameter('max_weight', dtype='float32', shape=(1), allow_deferred_init=True)
if use_bias:
self.bias = mx.gluon.Parameter('bias', dtype='int8', shape=(num_filter,), allow_deferred_init=True)
self.min_bias = mx.gluon.Parameter('min_bias', dtype='float32', shape=(1), allow_deferred_init=True)
self.max_bias = mx.gluon.Parameter('max_bias', dtype='float32', shape=(1), allow_deferred_init=True)
def forward(self, x):
device = x.device
weight = self.weight.data().to_device(device)
bias = self.bias.data().to_device(device) if self.use_bias else None
min_data = self.min_data.data().to_device(device)
max_data = self.max_data.data().to_device(device)
min_weight = self.min_weight.data().to_device(device)
max_weight = self.max_weight.data().to_device(device)
min_bias = self.min_bias.data().to_device(device) if self.use_bias else None
max_bias = self.max_bias.data().to_device(device) if self.use_bias else None
out = npx.quantized_conv(data=x, weight=weight, bias=bias,
min_data=min_data, max_data=max_data,
min_weight=min_weight, max_weight=max_weight,
min_bias=min_bias, max_bias=max_bias,
**self._kwargs)
return out
convint8 = QuantConv(channels=num_filter, kernel_size=kernel, strides=stride,
padding=pad, dilation=dilate, use_bias=use_bias)
quantized_range = 127.0
qargs = {
'weight': new_args['weight'].astype('int8'),
'min_data': mx.np.array([-quantized_range]),
'max_data': mx.np.array([quantized_range]),
'min_weight': mx.np.array([-quantized_range]),
'max_weight': mx.np.array([quantized_range])
}
if use_bias:
qargs.update({
'bias': new_args['bias'].astype('int8'),
'min_bias': mx.np.array([-quantized_range]),
'max_bias': mx.np.array([quantized_range]),
})
convint8.load_dict(qargs, cast_dtype=True, dtype_source='saved')
qoutput, min_range, max_range = convint8(input_data.astype(qdtype))
if use_bias:
# with adding bias, accuracy loss should not be greater than one
diff = mx.np.abs(output - qoutput.astype(output.dtype))
cond = mx.np.less(2, diff).sum().item()
assert cond == 0
else:
assert_almost_equal(output.asnumpy(), qoutput.asnumpy(), atol = 1)
for qdtype in ['int8', 'uint8']:
check_quantized_conv((3, 4, 28, 28), (3, 3), 128, (1, 1), (1, 1), (1, 1), True, qdtype)
check_quantized_conv((3, 4, 28, 28), (3, 3), 128, (1, 1), (1, 1), (1, 1), False, qdtype)
check_quantized_conv((1, 3, 4, 28, 28), (1, 3, 3), 128, (1, 1, 1), (1, 1, 1), (1, 1, 1), False, qdtype)
check_quantized_conv((1, 3, 4, 28, 28), (1, 3, 3), 128, (1, 1, 1), (1, 1, 1), (1, 1, 1), True, qdtype)
check_quantized_conv((1, 3, 4, 28, 28), (1, 3, 3), 128, (1, 1, 1), (1, 1, 1), (2, 2, 2), False, qdtype)
check_quantized_conv((1, 3, 4, 28, 28), (1, 3, 3), 128, (1, 1, 1), (1, 1, 1), (2, 2, 2), True, qdtype)
@use_np
def test_quantized_elemwise_add():
def check_quantized_elemwise_add(data_shape, qdtypeA, qdtypeB):
if is_test_for_native_cpu():
print('skipped testing quantized_elemwise_add for native cpu since it is not supported yet')
return
elif (qdtypeA != 'uint8' and qdtypeA != 'int8') or (qdtypeB != 'uint8' and qdtypeB != 'int8'):
print('skipped testing quantized_elemwise_add for not supported data type')
return
elif is_test_for_gpu():
print('skipped testing quantized_elemwise_add for gpu since it is not supported yet')
return
class ElemwiseSumBlock(mx.gluon.nn.HybridBlock):
def __init__(self, **kwargs):
super(ElemwiseSumBlock, self).__init__(**kwargs)
def forward(self, dataA, dataB):
return dataA + dataB
class QuantElemwiseSumBlock(mx.gluon.nn.HybridBlock):
def __init__(self, **kwargs):
super(QuantElemwiseSumBlock, self).__init__(**kwargs)
def forward(self, dataA, dataB, dataA_min, dataA_max, dataB_min, dataB_max):
return npx.quantized_elemwise_add(dataA, dataB, dataA_min, dataA_max, dataB_min, dataB_max)
elemwise_add_fp32 = ElemwiseSumBlock()
dataA_low, dataA_high = get_low_high(qdtypeA)
dataB_low, dataB_high = get_low_high(qdtypeB)
dataA_val = mx.np.random.uniform(low=dataA_low, high=dataA_high, size=data_shape).astype('int32').astype('float32')
dataB_val = mx.np.random.uniform(low=dataB_low, high=dataB_high, size=data_shape).astype('int32').astype('float32')
output = elemwise_add_fp32(dataA_val, dataB_val)
mx.nd.waitall()
#run quantized
quantized_elemwise_add = QuantElemwiseSumBlock()
dataA_val_int8 = dataA_val.astype(qdtypeA)
dataB_val_int8 = dataB_val.astype(qdtypeB)
quantized_range = 127.0
min_dataA = mx.np.array([dataA_low])
max_dataA = mx.np.array([dataA_high])
min_dataB = mx.np.array([dataB_low])
max_dataB = mx.np.array([dataB_high])
qoutput, min_range, max_range = quantized_elemwise_add(dataA_val_int8, dataB_val_int8,
min_dataA, max_dataA,
min_dataB, max_dataB)
int8_rslt = qoutput.astype(output.dtype) * max_range / 0x7fffffff
diff = mx.np.abs(output - int8_rslt)
cond = mx.np.less(2, diff).sum().item()
assert cond == 0
check_quantized_elemwise_add((4, 6), 'uint8', 'int8')
check_quantized_elemwise_add((13, 74, 52), 'uint8', 'uint8')
check_quantized_elemwise_add((3, 4, 56, 56), 'int8', 'uint8')
check_quantized_elemwise_add((32, 56, 64, 11), 'int8', 'int8')
@use_np
def test_quantized_npi_add():
def check_quantized_npi_add(data_shape, qdtypeA, qdtypeB, broadcast=None):
if is_test_for_native_cpu():
print('skipped testing quantized_npi_add for native cpu since it is not supported yet')
return
elif (qdtypeA != 'uint8' and qdtypeA != 'int8') or (qdtypeB != 'uint8' and qdtypeB != 'int8'):
print('skipped testing quantized_npi_add for not supported data type')
return
elif is_test_for_gpu():
print('skipped testing quantized_npi_add for gpu since it is not supported yet')
return
class ElemwiseSumBlock(mx.gluon.nn.HybridBlock):
def __init__(self, **kwargs):
super(ElemwiseSumBlock, self).__init__(**kwargs)
def forward(self, dataA, dataB):
return dataA + dataB
class QuantElemwiseSumBlock(mx.gluon.nn.HybridBlock):
def __init__(self, **kwargs):
super(QuantElemwiseSumBlock, self).__init__(**kwargs)
def forward(self, dataA, dataB, dataA_min, dataA_max, dataB_min, dataB_max):
return npx.quantized_npi_add(dataA, dataB, dataA_min, dataA_max, dataB_min, dataB_max)
elemwise_add_fp32 = ElemwiseSumBlock()
dataA_low, dataA_high = get_low_high(qdtypeA)
dataB_low, dataB_high = get_low_high(qdtypeB)
data_shapeA = data_shape
data_shapeB = data_shape
if broadcast :
if broadcast == 'A':
data_shapeA = ()
for index in range(len(data_shape)):
data_shapeA += (1,)
else:
data_shapeB = ()
for index in range(len(data_shape)):
data_shapeB += (1,)
dataA_val = mx.np.random.uniform(low=dataA_low, high=dataA_high, size=data_shapeA).astype('int32').astype('float32')
dataB_val = mx.np.random.uniform(low=dataB_low, high=dataB_high, size=data_shapeB).astype('int32').astype('float32')
output = elemwise_add_fp32(dataA_val, dataB_val)
#run quantized
quantized_elemwise_add = QuantElemwiseSumBlock()
dataA_val_int8 = dataA_val.astype(qdtypeA)
dataB_val_int8 = dataB_val.astype(qdtypeB)
quantized_range = 127.0
min_dataA = mx.np.array([dataA_low])
max_dataA = mx.np.array([dataA_high])
min_dataB = mx.np.array([dataB_low])
max_dataB = mx.np.array([dataB_high])
qoutput, min_range, max_range = quantized_elemwise_add(dataA_val_int8, dataB_val_int8,
min_dataA, max_dataA,
min_dataB, max_dataB)
int8_rslt = qoutput.astype(output.dtype) * max_range / 0x7fffffff
diff = mx.np.abs(output - int8_rslt)
cond = mx.np.less(2, diff).sum().item()
assert cond == 0
check_quantized_npi_add((4, 6), 'uint8', 'int8')
check_quantized_npi_add((13, 74, 52), 'uint8', 'uint8')
check_quantized_npi_add((3, 4, 56, 56), 'int8', 'uint8')
check_quantized_npi_add((32, 56, 64, 11), 'int8', 'int8')
check_quantized_npi_add((4, 6), 'uint8', 'int8', 'A')
check_quantized_npi_add((13, 74, 52), 'uint8', 'uint8', 'B')
check_quantized_npi_add((3, 4, 56, 56), 'int8', 'uint8', 'A')
check_quantized_npi_add((32, 56, 64, 11), 'int8', 'int8', 'B')
@use_np
def test_quantized_elemwise_mul():
def check_quantized_elemwise_mul(data_shape, qtype):
if is_test_for_native_cpu():
print('skipped testing quantized_elemwise_mul for native cpu since it is not supported yet')
return
elif qtype != 'int8':
print('skipped testing quantized_elemwise_mul for not supported data type')
return
elif is_test_for_gpu():
print('skipped testing quantized_elemwise_mul for gpu since it is not supported yet')
return
class ElemwiseMulBlock(mx.gluon.nn.HybridBlock):
def __init__(self, **kwargs):
super(ElemwiseMulBlock, self).__init__(**kwargs)
def forward(self, dataA, dataB):
return mx.np.multiply(dataA, dataB)
class QuantElemwiseMulBlock(mx.gluon.nn.HybridBlock):
def __init__(self, **kwargs):
super(QuantElemwiseMulBlock, self).__init__(**kwargs)
def forward(self, dataA, dataB, dataA_min, dataA_max, dataB_min, dataB_max):
return npx.quantized_elemwise_mul(dataA, dataB, dataA_min, dataA_max, dataB_min, dataB_max)
elemwise_mul_fp32 = ElemwiseMulBlock()
data_low, data_high = get_low_high(qtype)
dataA_val = mx.np.random.uniform(low=data_low, high=data_high, size=data_shape).astype('int32').astype('float32')
dataB_val = mx.np.random.uniform(low=data_low, high=data_high, size=data_shape).astype('int32').astype('float32')
output = elemwise_mul_fp32(dataA_val, dataB_val)
quantized_elemwise_mul = QuantElemwiseMulBlock()
dataA_val_int8 = dataA_val.astype(qtype)
dataB_val_int8 = dataB_val.astype(qtype)
quantized_range = 127.0
min_dataA = mx.np.array([data_low])
max_dataA = mx.np.array([data_high])
min_dataB = mx.np.array([data_low])
max_dataB = mx.np.array([data_high])
qoutput, min_range, max_range = quantized_elemwise_mul(dataA_val_int8, dataB_val_int8,
min_dataA, max_dataA,
min_dataB, max_dataB)
fp32_rslt = output.asnumpy()
int8_rslt = qoutput.astype(output.dtype)
assert_almost_equal(fp32_rslt, int8_rslt, atol = 1e-4)
for qtype in ['int8', 'uint8']:
check_quantized_elemwise_mul((4, 6), qtype)
check_quantized_elemwise_mul((13, 74, 52), qtype)
check_quantized_elemwise_mul((3, 4, 56, 56), qtype)
check_quantized_elemwise_mul((32, 56, 64, 11), qtype)
@use_np
def test_quantized_pooling():
def check_quantized_pooling(data_shape, kernel, pool_type, pad, stride, global_pool, qdtype, convention='valid'):
if is_test_for_native_cpu():
print('skipped testing quantized_pooling for native cpu since it is not supported yet')
return
elif qdtype == 'uint8' and is_test_for_gpu():
print('skipped testing quantized_pooling for gpu uint8 since it is not supported yet')
return
elif is_test_for_gpu() and len(data_shape) != 4:
print('skipped testing quantized_pooling for gpu 5d layout since it is not supported yet')
return
class PoolingBlock(mx.gluon.nn.HybridBlock):
def __init__(self, kernel=kernel, pad=pad, stride=stride,
pool_type=pool_type, global_pool=global_pool, cudnn_off=False,
pooling_convention=convention):
super(PoolingBlock, self).__init__()
self._kwargs = {'kernel': kernel, 'pad': pad, 'stride': stride,
'pool_type': pool_type, 'global_pool': global_pool,
'cudnn_off': False, 'pooling_convention': convention}
def forward(self, data):
return npx.pooling(data, **self._kwargs)
class QuantPoolingBlock(mx.gluon.nn.HybridBlock):
def __init__(self, kernel=kernel, pad=pad, stride=stride,
pool_type=pool_type, global_pool=global_pool,
cudnn_off=False, pooling_convention=convention):
super(QuantPoolingBlock, self).__init__()
self._kwargs = {'kernel': kernel, 'pad': pad, 'stride': stride,
'pool_type': pool_type, 'global_pool': global_pool, 'cudnn_off': False,
'pooling_convention':convention}
def forward(self, data, min_data, max_data):
return npx.quantized_pooling(data, min_data, max_data, **self._kwargs)
pooling_fp32 = PoolingBlock()
if qdtype == 'uint8':
data_low = 0.0
data_high = 127.0
else:
data_low = -127.0
data_high = 127.0
input_data = mx.np.random.uniform(low=data_low,
high=data_high,
size=data_shape
).astype('int32').astype('float32')
output = pooling_fp32(input_data)
quantized_pooling = QuantPoolingBlock(kernel=kernel, pad=pad, stride=stride,
pool_type=pool_type, global_pool=global_pool,
pooling_convention=convention)
int8_input_data = input_data.astype(qdtype)
quantized_range = 127.0
min_data = mx.np.array([-quantized_range])
max_data = mx.np.array([quantized_range])
qoutput, min_range, max_range = quantized_pooling(int8_input_data, min_data, max_data)
if pool_type == 'max':
assert_almost_equal(output.asnumpy(), qoutput.asnumpy())
elif pool_type == 'avg': # for avg pooling, fp32 and int8 may be different due to rounding errors
diff = mx.np.abs(output - qoutput.astype(output.dtype))
cond = mx.np.less(2, diff).sum().item()
assert cond == 0
for qdtype in ['int8', 'uint8']:
check_quantized_pooling((3, 4, 56, 56), (3, 3), 'max', (0, 0), (2, 2), False, qdtype)
check_quantized_pooling((3, 4, 56, 56), (3, 3), 'max', (0, 0), (2, 2), True, qdtype)
check_quantized_pooling((3, 512, 7, 7), (7, 7), 'avg', (0, 0), (1, 1), False, qdtype)
check_quantized_pooling((3, 512, 7, 7), (7, 7), 'avg', (0, 0), (1, 1), True, qdtype)
check_quantized_pooling((3, 4, 3, 56, 56), (1, 3, 3), 'max', (0, 0, 0), (1, 2, 2), False, qdtype)
check_quantized_pooling((3, 4, 3, 56, 56), (1, 3, 3), 'max', (0, 0, 0), (1, 2, 2), True, qdtype)
check_quantized_pooling((3, 512, 3, 7, 7), (1, 7, 7), 'avg', (0, 0, 0), (1, 2, 2), False, qdtype)
check_quantized_pooling((3, 512, 3, 7, 7), (1, 7, 7), 'avg', (0, 0, 0), (1, 2, 2), True, qdtype)
check_quantized_pooling((3, 4, 56, 56), (3, 3), 'max', (0, 0), (2, 2), False, qdtype, 'full')
check_quantized_pooling((3, 4, 56, 56), (3, 3), 'max', (0, 0), (2, 2), True, qdtype, 'full')
check_quantized_pooling((3, 512, 7, 7), (7, 7), 'avg', (0, 0), (1, 1), False, qdtype, 'full')
check_quantized_pooling((3, 512, 7, 7), (7, 7), 'avg', (0, 0), (1, 1), True, qdtype, 'full')
check_quantized_pooling((3, 4, 3, 56, 56), (1, 3, 3), 'max', (0, 0, 0), (1, 2, 2), False, qdtype, 'full')
check_quantized_pooling((3, 4, 3, 56, 56), (1, 3, 3), 'max', (0, 0, 0), (1, 2, 2), True, qdtype, 'full')
check_quantized_pooling((3, 512, 3, 7, 7), (1, 7, 7), 'avg', (0, 0, 0), (1, 2, 2), False, qdtype, 'full')
check_quantized_pooling((3, 512, 3, 7, 7), (1, 7, 7), 'avg', (0, 0, 0), (1, 2, 2), True, qdtype, 'full')
@use_np
def test_quantized_fc():
def check_quantized_fc(data_shape, num_hidden, use_bias, qdtype, flatten=True):
if is_test_for_native_cpu():
hasMKL = False
for key in os.environ.keys():
if operator.eq(key, "BUILD_TAG"):
if os.environ['BUILD_TAG'].find("MKL") != -1:
hasMKL = True
break
if hasMKL == False:
print('skipped testing quantized_fc on cpu since s8u8s32 is only supported by MKL BLAS library')
return
elif qdtype == 'uint8' and is_test_for_gpu():
print('skipped testing quantized_fc for gpu uint8 since it is not supported yet')
return
def maxabs(a, b):
return mx.np.maximum(mx.np.abs(a), mx.np.abs(b))
int8_range = 127.0
if qdtype == 'uint8':
data_low = 0.0
data_high = 63.0
quantized_range = 255.0
else:
data_low = -63.0
data_high = 63.0
quantized_range = 127.0
data = mx.np.random.uniform(low=data_low,
high=data_high,
size=data_shape
).astype('int32').astype('float32')
fc_fp32 = mx.gluon.nn.Dense(units=num_hidden, use_bias=use_bias, flatten=flatten)
fc_fp32.initialize()
fc_fp32(data)
npx.waitall()
fp32_params = fc_fp32.collect_params()
weight_shape = fp32_params['weight'].shape
new_args = dict()
new_args['weight'] = mx.np.random.uniform(low=data_low,
high=data_high,
size=fp32_params['weight'].shape
).astype('int32').astype('float32')
data_min = mx.np.min(data).astype('float32')
data_max = mx.np.max(data).astype('float32')
weight_min = mx.np.min(new_args['weight']).astype('float32')
weight_max = mx.np.max(new_args['weight']).astype('float32')
data_range = maxabs(data_min, data_max)
weight_range = maxabs(weight_min, weight_max)
if use_bias:
bias = mx.np.random.uniform(low=data_low,
high=data_high,
size=fp32_params['bias'].shape
).astype('int32').astype('float32')
bias_min = mx.np.min(bias).astype('float32')
bias_max = mx.np.max(bias).astype('float32')
bias_range = maxabs(bias_min, bias_max)
bias_scale = int8_range / bias_range
data_scale = quantized_range / data_range
weight_scale = int8_range / weight_range
bias_int32_rescale = data_scale * weight_scale / bias_scale
new_bias = bias.astype('float32') * bias_int32_rescale
new_args['bias'] = new_bias.astype('int32').astype('float32')
fc_fp32.load_dict(new_args, cast_dtype=True, dtype_source='saved')
output = fc_fp32(data)
class QuantFC(mx.gluon.nn.HybridBlock):
def __init__(self, num_hidden, use_bias, flatten, **kwargs):
super(QuantFC, self).__init__(**kwargs)
self.use_bias = use_bias
self._kwargs = {'num_hidden': num_hidden, 'no_bias': not use_bias, 'flatten': flatten}
self.min_data = mx.gluon.Parameter('min_data', dtype='float32', shape=(1), allow_deferred_init=True)
self.max_data = mx.gluon.Parameter('max_data', dtype='float32', shape=(1), allow_deferred_init=True)
self.weight = mx.gluon.Parameter('weight', dtype='int8', shape=weight_shape, allow_deferred_init=True)
self.min_weight = mx.gluon.Parameter('min_weight', dtype='float32', shape=(1), allow_deferred_init=True)
self.max_weight = mx.gluon.Parameter('max_weight', dtype='float32', shape=(1), allow_deferred_init=True)
if use_bias:
self.bias = mx.gluon.Parameter('bias', dtype='int8', shape=(num_hidden,), allow_deferred_init=True)
self.min_bias = mx.gluon.Parameter('min_bias', dtype='float32', shape=(1), allow_deferred_init=True)
self.max_bias = mx.gluon.Parameter('max_bias', dtype='float32', shape=(1), allow_deferred_init=True)
def forward(self, x):
device = x.device
weight = self.weight.data().to_device(device)
bias = self.bias.data().to_device(device) if self.use_bias else None
min_data = self.min_data.data().to_device(device)
max_data = self.max_data.data().to_device(device)
min_weight = self.min_weight.data().to_device(device)
max_weight = self.max_weight.data().to_device(device)
min_bias = self.min_bias.data().to_device(device) if self.use_bias else None
max_bias = self.max_bias.data().to_device(device) if self.use_bias else None
out = npx.quantized_fully_connected(data=x, weight=weight, bias=bias,
min_data=min_data, max_data=max_data,
min_weight=min_weight, max_weight=max_weight,
min_bias=min_bias, max_bias=max_bias,
**self._kwargs)
return out
fc_int8 = QuantFC(num_hidden=num_hidden, use_bias=use_bias, flatten=flatten)
qargs = {
'weight': new_args['weight'].astype('int8'),
'min_data': mx.np.array([-data_range]),
'max_data': mx.np.array([data_range]),
'min_weight': mx.np.array([-weight_range]),
'max_weight': mx.np.array([weight_range])
}
if use_bias:
qargs.update({
'bias': bias.astype('int8'),
'min_bias': mx.np.array([-bias_range]),
'max_bias': mx.np.array([bias_range]),
})
fc_int8.load_dict(qargs, cast_dtype=True, dtype_source='saved')
qoutput, min_range, max_range = fc_int8(data.astype(qdtype))
if use_bias:
# with adding bias, accuracy loss should not be greater than one
diff = mx.np.abs(output - qoutput.astype(output.dtype))
cond = mx.np.less(2, diff).sum().item()
assert cond == 0
else:
assert_almost_equal(output.asnumpy(), qoutput.asnumpy())
for qdtype in ['int8', 'uint8']:
if is_test_for_dnnl():
check_quantized_fc((32, 512, 2), 100, False, qdtype, flatten=False)
check_quantized_fc((32, 512, 2), 100, True, qdtype, flatten=False)
check_quantized_fc((32, 512, 2, 2), 100, False, qdtype, flatten=False)
check_quantized_fc((32, 512, 2, 2), 100, True, qdtype, flatten=False)
check_quantized_fc((32, 512, 2, 2), 100, False, qdtype)
check_quantized_fc((32, 111, 2, 2), 100, False, qdtype)
check_quantized_fc((32, 512, 2, 2), 100, True, qdtype)
check_quantized_fc((32, 111, 2, 2), 100, True, qdtype)
check_quantized_fc((256, 2048, 2, 2), 800, True, qdtype)
check_quantized_fc((256, 111, 2, 2), 800, True, qdtype)
check_quantized_fc((256, 2048, 2, 2), 800, False, qdtype)
check_quantized_fc((256, 111, 2, 2), 800, False, qdtype)
@use_np
def test_quantized_transpose():
def check_quantized_transpose(shape, qdtype, axes):
data_low, data_high = get_low_high(qdtype)
data = mx.np.random.uniform(low=data_low, high=data_high, size=shape).astype(qdtype).astype('float32')
min_data = mx.np.array([mx.np.min(data).astype('float32').item()])
max_data = mx.np.array([mx.np.max(data).astype('float32').item()])
qdata = data.astype(qdtype)
output = mx.np.transpose(data, axes=axes)
qoutput, min_output, max_output = npx.quantized_transpose(qdata, min_data, max_data, axes=axes)
assert_almost_equal(output.asnumpy(), qoutput.asnumpy())
assert_almost_equal(min_output.item(), min_data.item())
assert_almost_equal(max_output.item(), max_data.item())
for qtype in ['int8', 'uint8']:
check_quantized_transpose((), qtype, ())
check_quantized_transpose((2,3), qtype, (1,0))
check_quantized_transpose((8,21), qtype, (1,0))
check_quantized_transpose((7,3,9), qtype, (2,1,0))
check_quantized_transpose((5,3,6,8), qtype, (2,3,0,1))
@use_np
def test_quantized_embedding():
def check_quantized_embedding(data_shape, input_dim, output_dim):
if is_test_for_gpu():
print('skipped testing test_quantized_embedding for gpu since it is not supported yet')
return
def maxabs(a, b):
return mx.np.maximum(mx.np.abs(a), mx.np.abs(b))
data = mx.np.random.uniform(low=0,
high=input_dim,
size=data_shape
).astype('int32').astype('float32')
embedding_fp32 = mx.gluon.nn.Embedding(input_dim=input_dim, output_dim=output_dim)
embedding_fp32.initialize()
embedding_fp32(data)
npx.waitall()
fp32_params = embedding_fp32.collect_params()
weight_shape = fp32_params['weight'].shape
int8_range = 127.0
new_params = dict()
weight = mx.np.random.uniform(low=-int8_range,
high=int8_range,
size=weight_shape
).astype('int32').astype('float32')
new_params['weight'] = weight
embedding_fp32.load_dict(new_params, cast_dtype=True, dtype_source='saved')
output = embedding_fp32(data)
weight_min = mx.np.min(weight).astype('float32')
weight_max = mx.np.max(weight).astype('float32')
weight_range = maxabs(weight_min, weight_max)
class QuantEmbedding(mx.gluon.nn.HybridBlock):
def __init__(self, input_dim=input_dim, output_dim=output_dim, **kwargs):
super(QuantEmbedding, self).__init__(**kwargs)
self._kwargs = {'input_dim': input_dim, 'output_dim': output_dim}
self.weight = mx.gluon.Parameter('weight', dtype='float32', shape=weight_shape, allow_deferred_init=True)
self.min_weight = mx.gluon.Parameter('min_weight', dtype='float32', shape=(1), allow_deferred_init=True)
self.max_weight = mx.gluon.Parameter('max_weight', dtype='float32', shape=(1), allow_deferred_init=True)
def forward(self, x):
device = x.device
weight = self.weight.data().to_device(device)
min_weight = self.min_weight.data().to_device(device)
max_weight = self.max_weight.data().to_device(device)
out = npx.quantized_embedding(data=x, weight=weight,
min_weight=min_weight,
max_weight=max_weight,
**self._kwargs)
return out
embedding_int8 = QuantEmbedding(input_dim=input_dim, output_dim=output_dim)
qargs = {
'weight': weight.astype('int8'),
'min_weight': mx.np.array([-weight_range]),
'max_weight': mx.np.array([weight_range])
}
embedding_int8.load_dict(qargs, cast_dtype=True, dtype_source='saved')
qoutput, min_range, max_range = embedding_int8(data)
assert_almost_equal(output.asnumpy(), qoutput.asnumpy())
check_quantized_embedding((1,), 1000, 256)
check_quantized_embedding((1,), 1024, 512)
check_quantized_embedding((32,), 1000, 256)
check_quantized_embedding((32,), 1024, 512)
@use_np
def test_quantized_flatten():
def check_quantized_flatten(shape, qdtype):
if qdtype == 'uint8':
data_low = 0.0
data_high = 127.0
else:
data_low = -127.0
data_high = 127.0
qdata = mx.np.random.uniform(low=data_low, high=data_high, size=shape).astype(qdtype)
min_data = mx.np.array([-1023.343], dtype='float32')
max_data = mx.np.array([2343.324275], dtype='float32')
qoutput, min_output, max_output = npx.quantized_flatten(qdata, min_data, max_data)
assert qoutput.ndim == 2
assert qoutput.shape[0] == qdata.shape[0]
assert qoutput.shape[1] == onp.prod(qdata.shape[1:])
assert same(qdata.asnumpy().flatten(), qoutput.asnumpy().flatten())
assert same(min_data.asnumpy(), min_output.asnumpy())
assert same(max_data.asnumpy(), max_output.asnumpy())
for qdtype in ['int8', 'uint8']:
check_quantized_flatten((10,), qdtype)
check_quantized_flatten((10, 15), qdtype)
check_quantized_flatten((10, 15, 18), qdtype)
check_quantized_flatten((3, 4, 23, 23), qdtype)
@use_np
def test_quantized_act():
def check_quantized_act(data_shape, qdtype):
if is_test_for_native_cpu():
print('skipped testing quantized_act for native cpu since it is not supported yet')
return
elif qdtype == 'int8' and is_test_for_dnnl():
print('skipped testing quantized_act for oneDNN cpu int8 since it is not supported yet')
return
elif is_test_for_gpu():
print('skipped testing quantized_act for gpu since it is not supported yet')
return
act_fp32 = mx.gluon.nn.Activation(activation='relu')
if qdtype == 'uint8':
data_low = 0.0
data_high = 127.0
else:
data_low = -127.0
data_high = 127.0
data = mx.np.random.uniform(low=data_low,
high=data_high,
size=data_shape
).astype(qdtype).astype('float32')
output = act_fp32(data)
class QuantActivation(mx.gluon.nn.HybridBlock):
def __init__(self, activation, **kwargs):
super(QuantActivation, self).__init__(**kwargs)
self._kwargs = {'act_type': activation}
def forward(self, x, min_data, max_data):
out = npx.quantized_act(data=x, min_data=min_data, max_data=max_data, **self._kwargs)
return out
quantized_act = QuantActivation(activation='relu')
qdata = data.astype(qdtype)
quantized_range_min = mx.np.array([mx.np.min(data).astype('float32').item()])
quantized_range_max = mx.np.array([mx.np.max(data).astype('float32').item()])
qoutput, min_range, max_range = quantized_act(qdata, quantized_range_min, quantized_range_max)
assert_almost_equal(output.asnumpy(), qoutput.asnumpy())
assert_almost_equal(min_range.item(), quantized_range_min.item())
assert_almost_equal(max_range.item(), quantized_range_max.item())
for qdtype in ['int8', 'uint8']:
check_quantized_act((10,), qdtype)
check_quantized_act((10, 15), qdtype)
check_quantized_act((10, 15, 18), qdtype)
check_quantized_act((3, 4, 23, 23), qdtype)
@use_np
def test_quantized_bn():
def get_mean_var(data):
axes = list(range(data.ndim))
del axes[1]
mean = mx.np.mean(data, axis=axes)
mean_broad = mx.np.expand_dims(mean, axis=0)
mean_broad = mx.np.expand_dims(mean_broad, axis=2)
mean_broad = mx.np.expand_dims(mean_broad, axis=3)
mean_broad = mx.npx.broadcast_like(mean_broad, data)
var = mx.np.multiply(data - mean_broad, data - mean_broad)
axes = list(range(var.ndim))
del axes[1]
var = mx.np.mean(var, axis=axes)
return mean, var
def check_quantized_bn(data_shape, qdtype):
if is_test_for_native_cpu():
print('skipped testing quantize_bn for native cpu since it is not supported yet')
return
elif is_test_for_gpu():
print('skipped testing quantize_bn for gpu since it is not supported yet')
return
data_low, data_high = get_low_high(qdtype)
# run fp32 bn
bn_fp32 = mx.gluon.nn.BatchNorm(use_global_stats=True, scale=True)
data = mx.np.random.uniform(low=data_low, high=data_high, size=data_shape)
bn_fp32.initialize()
bn_fp32.hybridize()
bn_fp32(data)
fp32_params = bn_fp32.collect_params()
data = mx.np.random.uniform(low=data_low, high=data_high, size=data_shape)
gamma = mx.np.random.uniform(low=data_low, high=data_high, size=fp32_params['gamma'].shape)
beta = mx.np.random.uniform(low=data_low, high=data_high, size=fp32_params['beta'].shape)
running_mean, running_var = get_mean_var(data)
new_params = {
'gamma':gamma,
'beta':beta,
'running_mean': running_mean,
'running_var': running_var
}
bn_fp32.load_dict(new_params)
output = bn_fp32(data)
# generate int8 bn from fp32 bn
calib_data = mx.gluon.data.DataLoader(data, batch_size=data_shape[0])
quant_bn = mx.contrib.quant.quantize_net(bn_fp32,
quantized_dtype=qdtype,
quantize_mode='full',
calib_data=calib_data,
calib_mode='naive',
num_calib_batches=1,
device=mx.current_device())
output_int8_to_fp32 = quant_bn(data)
assert_almost_equal(output.asnumpy(), output_int8_to_fp32.asnumpy(), rtol=1e-1, atol=8)
for qdtype in ['int8', 'uint8']:
check_quantized_bn((32, 512, 4, 4), qdtype)
check_quantized_bn((32, 1024, 8, 8), qdtype)
check_quantized_bn((32, 3, 224, 224), qdtype)
def test_quantized_reshape():
test_cases = [((2, 3, 5, 5), (-2, -1), False, (2, 75)),
((2, 3, 5, 5), (-2, -2, -1), False, (2, 3, 25)),
((5, 3, 4, 5), (-2, -1, -2), False, (5, 15, 4)),
((2, 3, 5, 4), (-1, -2, -2), False, (8, 3, 5)),
((2, 3, 5, 5), (-2, -2, -2, -2), False, (2, 3, 5, 5)),
((2, 1, 4, 5), (-2, -3, -2, -2), False, (2, 4, 5)),
((1, 1, 4, 1), (-3, -3, -2, -2), False, (4, 1)),
((1, 1, 1, 1), (-3, -3, -3, -3), False, ()),
((2, 4, 5, 3), (-1, 2, 2, 1), False, (30, 2, 2, 1)),
((2, 3, 5, 6), (-4,), False, (2, 3, 5, 6)),
((2, 3, 5, 6), (6, 1, -4), False, (6, 1, 5, 6)),
((2, 3, 5, 6), (-5, -5), False, (6, 30)),
((2, 3, 5, 6), (-5, -1), False, (6, 30)),
((64,), (-6, 16, 4), False, (16, 4)),
((64,), (-6, 16, -1), False, (16, 4)),
((64, 1, 2, 3), (-6, 16, -1, -4), False, (16, 4, 1, 2, 3)),
((8, 5, 4, 6), (-4, -1, 3, -6), True, (8, 5, 4, 2, 3))]
def check_quantized_reshape(shape, qdtype, newshape, reverse, expected_ret_shape):
data_low, data_high = get_low_high(qdtype)
qdata = mx.np.random.uniform(low=data_low, high=data_high, size=shape).astype(qdtype)
min_data = mx.np.array([-1023.343], dtype='float32')
max_data = mx.np.array([2343.324275], dtype='float32')
qoutput, min_output, max_output = npx.quantized_reshape(qdata, min_data, max_data, newshape=newshape, reverse=reverse)
assert qoutput.shape == expected_ret_shape
assert same(qdata.asnumpy().flatten(), qoutput.asnumpy().flatten())
assert same(min_data.asnumpy(), min_output.asnumpy())
assert same(max_data.asnumpy(), max_output.asnumpy())
for qdtype in ['int8', 'uint8']:
for shape, newshape, reverse, expected_ret_shape in test_cases:
check_quantized_reshape(shape, qdtype, newshape, reverse, expected_ret_shape)
def test_quantize_params():
if is_test_for_native_cpu():
print('skipped testing quantized_params for native cpu since it is not supported yet')
return
data = mx.sym.Variable('data')
conv = mx.sym.Convolution(data, kernel=(1, 1), num_filter=2048, name='conv')
sym = mx.sym.BatchNorm(data=conv, eps=2e-05, fix_gamma=False, momentum=0.9, use_global_stats=False, name='bn')
offline_params = [name for name in sym.list_arguments()
if not name.startswith('data') and not name.endswith('label')]
params = {}
for name in offline_params:
params[name] = mx.nd.uniform(shape=(2, 2))
qsym, _ = mx.contrib.quant._quantize_symbol(sym, device=mx.current_device(),
offline_params=offline_params, quantize_mode='full')
qparams = mx.contrib.quant._quantize_params(qsym, params, min_max_dict = {})
param_names = params.keys()
qparam_names = qparams.keys()
for name in qparam_names:
if name.startswith('bn'):
assert name in param_names
elif name.startswith('conv'):
assert name not in param_names
assert name.find('quantize') != -1
class FP32Net(mx.gluon.nn.HybridBlock):
def __init__(self, **kwargs):
super(FP32Net, self).__init__(**kwargs)
self.conv = mx.gluon.nn.Conv2D(channels=16, kernel_size=(1,1))
self.bn = mx.gluon.nn.BatchNorm(epsilon=2e-05, scale=True, momentum=0.9, use_global_stats=False)
self.act = mx.gluon.nn.Activation(activation='relu')
self.pool = mx.gluon.nn.AvgPool2D(pool_size=(4,4))
self.fc = mx.gluon.nn.Dense(units=10, flatten=True)
def forward(self, x):
out = self.conv(x)
out = self.bn(out)
out = self.act(out)
out = self.pool(out)
out = self.fc(out)
return npx.softmax(out)
class FP32MultipleOutputs(mx.gluon.nn.HybridBlock):
def __init__(self, length, **kwargs):
super(FP32MultipleOutputs, self).__init__(**kwargs)
self.length = length
self.convs = mx.gluon.nn.Conv2D(channels=16, kernel_size=(1,1))
self.fc = mx.gluon.nn.Dense(units=10, flatten=True)
def forward(self, x):
res = npx.slice_channel(x, num_outputs=self.length,
axis=1, squeeze_axis=1)
out = []
for i in range(self.length):
out.append(self.convs(res[i]))
out[i] = mx.np.expand_dims(out[i], axis=0)
out = mx.np.concatenate(out)
out = mx.np.reshape(out, ((self.length, -1)))
out = self.fc(out)
return npx.softmax(out)
class FP32MultipleInputs(mx.gluon.nn.HybridBlock):
def __init__(self, **kwargs):
super(FP32MultipleInputs, self).__init__(**kwargs)
self.conv1 = mx.gluon.nn.Conv2D(channels=64, kernel_size=(1,1), use_bias=False)
self.bn1 = mx.gluon.nn.BatchNorm()
self.conv2 = mx.gluon.nn.Conv2D(channels=64, kernel_size=(1,1), use_bias=False)
self.bn2 = mx.gluon.nn.BatchNorm()
def forward(self, data0, data1):
out0 = self.conv1(data0)
out0 = self.bn1(out0)
out1 = self.conv2(data1)
out1 = self.bn2(out1)
return out1 + out0
@use_np
@xfail_when_nonstandard_decimal_separator
def test_quantize_model():
def check_params(params, qparams, qsym=None):
if qsym is None:
assert len(params) == len(qparams)
for k, v in params.items():
assert k in qparams
assert same(v.asnumpy(), qparams[k].asnumpy())
else:
qparams_ground_truth = mx.contrib.quant._quantize_params(qsym, params, min_max_dict = {})
assert len(qparams) == len(qparams_ground_truth)
for k, v in qparams_ground_truth.items():
assert k in qparams
assert same(v.asnumpy(), qparams[k].asnumpy())
def check_qsym_calibrated(qsym):
attrs = qsym.attr_dict()
for k, v in attrs.items():
if k.find('requantize_') != -1:
assert 'min_calib_range' in v
assert 'max_calib_range' in v
def check_qsym_qdtype(qsym, qdtype):
attrs = qsym.attr_dict()
for k, v in attrs.items():
if k.find('_quantize') != -1:
assert 'out_type' in v
assert v['out_type'] == qdtype
def skip_not_supported():
if is_test_for_native_cpu():
print('skipped testing quantize_model for native cpu since it is not supported yet')
return True
elif qdtype == 'int8' and is_test_for_dnnl():
print('skipped testing quantize_model for oneDNN cpu int8 since it is not supported yet')
return True
elif qdtype == 'uint8' and is_test_for_gpu():
print('skipped testing quantize_model for gpu uint8 since it is not supported yet')
return True
return False
def check_quantize_model(qdtype):
if is_test_for_native_cpu():
print('skipped testing quantize_model for native cpu since it is not supported yet')
return
elif qdtype == 'int8' and is_test_for_dnnl():
print('skipped testing quantize_model for oneDNN cpu int8 since it is not supported yet')
return
elif qdtype == 'uint8' and is_test_for_gpu():
print('skipped testing quantize_model for gpu uint8 since it is not supported yet')
return
standard_net = FP32Net()
standard_net.initialize()
batch_size = 4
data_shape = (batch_size, 4, 10, 10)
length = batch_size # specify num of outputs from split op
multi_out_net = FP32MultipleOutputs(length)
multi_out_net.initialize()
multi_out_data_shape = (length, 4, 4, 10, 10)
for net, dshape in zip((standard_net, multi_out_net), (data_shape, multi_out_data_shape)):
data = mx.np.random.uniform(low=0, high=1, size=dshape)
net.hybridize()
net(data)
sym, _ = net.export(None)
arg_params, aux_params = collect_block_args_aux(net, sym)
qsym, qarg_params, qaux_params = mx.contrib.quant.quantize_model(sym=sym,
arg_params=arg_params,
aux_params=aux_params,
device=mx.current_device(),
quantized_dtype=qdtype,
calib_mode='none',
quantize_mode='full')
check_params(arg_params, qarg_params, qsym)
check_params(aux_params, qaux_params)
calib_data = mx.np.random.uniform(size=dshape)
calib_data = mx.gluon.data.DataLoader(calib_data, batch_size=batch_size)
qsym, qarg_params, qaux_params = mx.contrib.quant.quantize_model(sym=sym,
arg_params=arg_params,
aux_params=aux_params,
device=mx.current_device(),
quantized_dtype=qdtype,
calib_mode='naive',
calib_data=calib_data,
num_calib_batches=1,
quantize_mode='full')
check_params(arg_params, qarg_params, qsym)
check_params(aux_params, qaux_params)
check_qsym_calibrated(qsym)
check_qsym_qdtype(qsym, qdtype)
def check_quantize_model_multiple_inputs(qdtype):
if skip_not_supported():
return
net = FP32MultipleInputs()
net.initialize()
net.hybridize()
dshape = (64, 4, 10, 10)
data = [mx.np.random.uniform(low=0, high=1, size=dshape),
mx.np.random.uniform(low=0, high=1, size=dshape)]
net(*data)
sym, _ = net.export(None)
arg_params, aux_params = collect_block_args_aux(net, sym)
qsym, qarg_params, qaux_params = mx.contrib.quant.quantize_model(sym=sym,
arg_params=arg_params,
aux_params=aux_params,
device=mx.current_device(),
quantized_dtype=qdtype,
calib_mode='none',
quantize_mode='full')
check_params(arg_params, qarg_params, qsym)
check_params(aux_params, qaux_params)
calib_data = [mx.np.random.uniform(size=dshape),
mx.np.random.uniform(size=dshape)]
calib_data = mx.gluon.data.DataLoader(mx.gluon.data.ArrayDataset(*calib_data), batch_size=4)
qsym, qarg_params, qaux_params = mx.contrib.quant.quantize_model(sym=sym,
arg_params=arg_params,
aux_params=aux_params,
device=mx.current_device(),
quantized_dtype=qdtype,
calib_mode='naive',
calib_data=calib_data,
data_names=["data0","data1"],
num_calib_batches=1,
quantize_mode='full')
check_params(arg_params, qarg_params, qsym)
check_params(aux_params, qaux_params)
check_qsym_calibrated(qsym)
check_qsym_qdtype(qsym, qdtype)
for qdtype in ['int8', 'uint8']:
check_quantize_model(qdtype)
check_quantize_model_multiple_inputs(qdtype)
@mx.util.use_np
def test_quantize_gluon_with_forward():
def check_quantize_net(qdtype):
if is_test_for_native_cpu():
print('skipped testing test_quantize_model_with_forward for native cpu since it is not supported yet')
return
elif is_test_for_gpu():
print('skipped testing test_quantize_model_with_forward for gpu uint8 since it is not supported yet')
return
data_shape = (32, 3, 224, 224)
batch_size = 1
resnet18_v1 = vision.resnet18_v1(pretrained=True)
resnet18_v1.reset_device(mx.current_device())
excluded_names_match = []
if mx.current_device() == mx.gpu():
excluded_names_match += ['activation', 'relu', 'conv0']
num_calib_batches = 1
random_data = mx.np.random.uniform(size=data_shape)
calib_data = mx.gluon.data.DataLoader(random_data, batch_size=batch_size)
quantized_resnet18_v1 = mx.contrib.quant.quantize_net(resnet18_v1, quantized_dtype=qdtype,
exclude_layers=None,
exclude_layers_match=excluded_names_match,
calib_mode='none',
data_shapes=[data_shape],
device=mx.current_device())
quantized_resnet18_v1.hybridize(static_alloc=True, static_shape=True)
quantized_resnet18_v1(random_data)
for mode in ['naive', 'entropy']:
for quantize_granularity in ['tensor-wise', 'channel-wise']:
qdtype = qdtype if mode == 'naive' else 'auto'
quantized_resnet18_v1 = mx.contrib.quant.quantize_net(resnet18_v1, quantized_dtype=qdtype,
exclude_layers=None,
exclude_layers_match=excluded_names_match,
calib_data=calib_data,
calib_mode=mode,
quantize_granularity=quantize_granularity,
num_calib_batches=num_calib_batches,
device=mx.current_device())
quantized_resnet18_v1.hybridize(static_alloc=True, static_shape=True)
quantized_resnet18_v1(random_data)
for qdtype in ['int8', 'uint8']:
check_quantize_net(qdtype)
@xfail_when_nonstandard_decimal_separator
def test_quantize_sym_with_calib():
if is_test_for_native_cpu():
print('skipped testing quantized_pooling for native cpu since it is not supported yet')
return
def get_fp32_sym():
data = mx.sym.Variable('data')
conv = mx.sym.Convolution(data, kernel=(1, 1), num_filter=16, name='conv')
bn = mx.sym.BatchNorm(data=conv, eps=2e-05, fix_gamma=False, momentum=0.9, use_global_stats=False, name='bn')
act = mx.sym.Activation(data=bn, act_type='relu', name='relu')
pool = mx.sym.Pooling(act, kernel=(4, 4), pool_type='avg', name='pool')
fc = mx.sym.FullyConnected(pool, num_hidden=10, flatten=True, name='fc')
sym = mx.sym.softmax(fc, name='softmax')
return sym
sym = get_fp32_sym()
offline_params = [name for name in sym.list_arguments()
if not name.startswith('data') and not name.endswith('label')]
qsym, _ = mx.contrib.quant._quantize_symbol(sym, device=mx.current_device(),
offline_params=offline_params, quantize_mode='full')
requantize_op_names = ['requantize_conv', 'requantize_fc']
min_max_dict = {'conv_output': (onp.random.uniform(low=100.0, high=200.0), onp.random.uniform(low=100.0, high=200.0)),
'fc_output': (onp.random.uniform(low=100.0, high=200.0), onp.random.uniform(low=100.0, high=200.0))}
op_name_to_th_name = {'requantize_conv': 'conv_output', 'requantize_fc': 'fc_output'}
cqsym = mx.contrib.quant._calibrate_quantized_sym(qsym, min_max_dict)
attr_dict = cqsym.attr_dict()
for name in requantize_op_names:
assert name in attr_dict
lhs = float(attr_dict[name]['min_calib_range'])
rhs = min_max_dict[op_name_to_th_name[name]][0]
assert_almost_equal(onp.array([lhs]), onp.array([rhs]))
lhs = float(attr_dict[name]['max_calib_range'])
rhs = min_max_dict[op_name_to_th_name[name]][1]
assert_almost_equal(onp.array([lhs]), onp.array([rhs]), rtol=1e-3, atol=1e-4)
@use_np
def test_quantization_net_with_different_data_inputs_options():
if is_test_for_native_cpu():
print('skipped testing test_quantization_net_with_different_data_inputs_options for native cpu since it is not supported yet')
return
elif is_test_for_gpu():
print('skipped testing test_quantization_net_with_different_data_inputs_options for gpu since it is not supported yet')
return
net = FP32Net()
net.initialize()
batch_size = 32
data_shape = (batch_size, 3, 224, 224)
random_data = mx.np.random.uniform(size=data_shape)
# pass data_shapes as list of tuples
quantized_net = mx.contrib.quant.quantize_net(net,
quantized_dtype='auto',
data_shapes=[data_shape],
device=mx.current_device())
out = quantized_net(random_data)
out.wait_to_read()
# pass data_shapes as list of DataDescs
net2 = FP32Net()
net2.initialize()
data_desc = mx.io.DataDesc('data', data_shape)
quantized_net2 = mx.contrib.quant.quantize_net(net2,
quantized_dtype='auto',
data_shapes=[data_desc],
device=mx.current_device())
out2 = quantized_net2(random_data)
out2.wait_to_read()
# pass data as DataLoader
net3 = FP32Net()
net3.initialize()
data_loader = mx.gluon.data.DataLoader(random_data, batch_size=batch_size)
quantized_net3 = mx.contrib.quant.quantize_net(net3,
quantized_dtype='auto',
calib_data=data_loader,
device=mx.current_device())
out3 = quantized_net3(random_data)
out3.wait_to_read()
def test_optimal_threshold_adversarial_case():
# The worst case for the optimal_threshold function is when the values are concentrated
# at one edge: [0, 0, ..., 1000]. (histogram)
# We want to make sure that the optimal threshold in this case is the max.
hist = []
hist_edges = []
min_val = -2
max_val = 2
for _ in range(0, 998):
hist.append(0)
for i in range(0, 999):
hist_edges.append((max_val - min_val) / 999 * i + min_val)
hist.append(1000)
hist_edges.append(max_val)
hist_data = (hist, hist_edges, min_val, max_val, max_val)
for dtype in ['uint8', 'int8', 'auto']:
res = mx.contrib.quant._LayerHistogramCollector.get_optimal_threshold(hist_data, dtype, num_quantized_bins=5)
# The threshold should be 2.
print (res)
assert abs(res[2] - 2) < 1e-5
def test_get_optimal_thresholds():
# Given an ndarray with elements following a uniform distribution, the optimal threshold
# for quantizing the ndarray should be either abs(min(nd)) or abs(max(nd)).
def get_threshold(nd):
min_nd = mx.nd.min(nd)
max_nd = mx.nd.max(nd)
return mx.nd.maximum(mx.nd.abs(min_nd), mx.nd.abs(max_nd)).asnumpy()
for dtype in ['uint8', 'int8', 'auto']:
nd = mx.nd.uniform(low=-10.532, high=11.3432, shape=(8, 3, 23, 23), dtype=onp.float64)
expected_threshold = get_threshold(nd)
arr = nd.asnumpy()
min_range = onp.min(arr)
max_range = onp.max(arr)
th = max(abs(min_range), abs(max_range))
hist, hist_edges = onp.histogram(arr, bins=8001, range=(-th, th))
hist_dict = {'layer1' : (hist, hist_edges, min_range, max_range, th)}
min_max_dict = mx.contrib.quant._LayerHistogramCollector.get_optimal_thresholds(hist_dict, dtype)
assert 'layer1' in min_max_dict
assert_almost_equal(onp.array([min_max_dict['layer1'][1]]), expected_threshold, rtol=1e-2, atol=1e-4)
@use_np
def test_rnn_quantization():
data_low = -1
data_high = 1
def check_rnn_quantization(num_layers, bidirectional, seq_len, batch_size, input_dim, state_size):
data_shape = (seq_len, batch_size, input_dim)
rnn_fp32 = mx.gluon.rnn.LSTM(hidden_size=state_size,
num_layers = num_layers,
bidirectional=bidirectional)
data = mx.np.random.uniform(low=data_low, high=data_high, size=data_shape)
states_shape = (num_layers * 2 if bidirectional else num_layers, batch_size, state_size)
states = [mx.np.zeros((states_shape)) for _ in range(batch_size)]
rnn_fp32.initialize()
rnn_fp32.hybridize()
ref_out = rnn_fp32(data, states)
class RNNDataLoader(mx.gluon.data.DataLoader):
def __init__(self, data, states):
super().__init__(mx.gluon.data.SimpleDataset([]), 1)
self.data = data
self.states = states
def __iter__(self):
return self
def __next__(self):
return [self.data, self.states]
def __bool__(self):
return bool(self.dataiter.iter_next())
calib_data = RNNDataLoader(data, states)
quant_rnn = mx.contrib.quant.quantize_net(rnn_fp32,
quantized_dtype='auto',
quantize_mode='full',
calib_data=calib_data,
calib_mode='naive',
num_calib_batches=1,
device=mx.current_device())
qout = quant_rnn(data, states)
qsym, _ = quant_rnn.export(None)
assert qsym.tojson().find("quantized_rnn") != -1
ref_out = [ref_out[0], ref_out[1][0], ref_out[1][1]]
for i in range(len(qout)):
mse = onp.mean((ref_out[i].asnumpy() - qout[i].asnumpy())**2)
assert mse < 0.001
check_rnn_quantization(1, False, 5, 2, 16, 16)
check_rnn_quantization(1, True, 5, 2, 16, 16)
@use_np
def test_quantized_rnn():
def check_quantized_rnn(num_layers, bidirectional, seq_len, batch_size, input_dim, state_size):
ndir = 2 if bidirectional else 1
size = ndir*state_size*4
first_lyr_param_size = (input_dim + state_size + 2) * size
other_lyr_param_size = (state_size * ndir + state_size + 2) * size
full_param_size = first_lyr_param_size + (num_layers - 1) * other_lyr_param_size
data = mx.np.random.uniform(-1, 1, (seq_len, batch_size, input_dim))
state = mx.np.random.uniform(-1, 1, (num_layers*ndir, batch_size, state_size))
state_cell = mx.np.random.uniform(0, 1, (num_layers*ndir, batch_size, state_size))
params = mx.np.random.normal(0, 1, (full_param_size,))
out = npx.rnn(data=data,
parameters=params,
mode='lstm',
state=state,
state_size=state_size,
state_cell=state_cell,
num_layers=num_layers,
bidirectional=bidirectional)
data_min = mx.np.min(data)
data_max = mx.np.max(data)
data_scale = mx.np.array(128.0 / (data_max - data_min)).reshape((1,))
data_shift = mx.np.array(128.0 - data_max * data_scale).reshape((1,))
qdata = (data * data_scale + data_shift + 0.5).astype('uint8')
qout = npx.contrib_quantized_rnn(data=qdata,
parameters=params,
mode='lstm',
state=state,
state_size=state_size,
state_cell=state_cell,
num_layers=num_layers,
bidirectional=bidirectional,
data_scale=data_scale,
data_shift=data_shift)
mse = onp.mean((out.asnumpy() - qout.asnumpy())**2)
assert mse < 0.001
check_quantized_rnn(1, False, 5, 2, 16, 16)
check_quantized_rnn(1, True, 5, 2, 16, 16)