blob: e4960e5b1a4de39bd0e92d3cf44ba7edef3928f9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=unused-argument
"""
ONNX testcases
================
This file is a test script to test Relax ONNX frontend coverage.
"""
from typing import Dict, List, Literal, Optional
import numpy as np
import onnx
import onnxruntime
import pytest
from onnx import ModelProto, TensorProto, helper
import tvm
import tvm.testing
from tvm import relax
from tvm.relax.frontend.onnx import from_onnx
from tvm.script import relax as R
from tvm.script import tir as T
from tvm.script import ir as I
bg = np.random.MT19937(0)
rg = np.random.Generator(bg)
def generate_random_inputs(
model: ModelProto, inputs: Optional[Dict[str, np.ndarray]] = None
) -> Dict[str, np.ndarray]:
input_values = {}
# Iterate through model inputs and extract their shape.
for i in model.graph.input:
if inputs is not None and i.name in inputs and inputs[i.name] is not None:
input_values[i.name] = inputs[i.name]
continue
shape = []
for dim in i.type.tensor_type.shape.dim:
shape.append(dim.dim_value)
input_values[i.name] = generate_random_value(shape, i.type.tensor_type.elem_type)
return input_values
def generate_random_value(shape, elem_type) -> np.ndarray:
# Extract datatype for the input.
if elem_type:
dtype = str(helper.tensor_dtype_to_np_dtype(elem_type))
else:
dtype = "float32"
# Generate random inputs for each input.
if dtype == "bool":
# random_value = np.random.choice(a=[False, True], size=shape)
random_value = rg.choice(a=[False, True], size=shape)
elif dtype.startswith("int"):
# Keep non-zero values
random_value = rg.integers(low=-63, high=63, size=shape).astype(dtype)
random_value[random_value <= 0] -= 1
else:
random_value = rg.standard_normal(size=shape).astype(dtype)
return random_value
def check_correctness(
model: ModelProto,
inputs: Optional[Dict[str, np.ndarray]] = None,
ir_version: int = 8,
opset: int = 14,
rtol: float = 1e-7,
atol: float = 1e-5,
check_dtypes: bool = False,
) -> None:
"""Run an onnx model in both onnxruntime and TVM through our importer
confirm that the results match. Otherwise, an exception will be raised.
Parameters
----------
model: ModelProto
The input onnx model that should be tested.
inputs: Optional[Dict[str, np.ndarray]]
An optional dictionary containing values for each input in the onnx model.
ir_version: int
Which version of the onnx IR to use.
opset: int
The opset version to use for the onnx importer.
atol: float
Set the tolerance of correctness checking. Some ops may be show more
arithmetic variance than others.
check_dtypes: bool
Check if data types are the same.
"""
# Configure model format.
if ir_version is not None:
model.ir_version = ir_version
if opset is not None:
model.opset_import[0].version = opset
# If inputs are not provided, extract them from the onnx graph and produce random
# values that we'll use for testing.
inputs = generate_random_inputs(model, inputs)
# Run the model through onnx to get the expected result.
ort_session = onnxruntime.InferenceSession(
model.SerializeToString(), providers=["CPUExecutionProvider"]
)
ort_output = ort_session.run([], inputs)
# Convert the onnx model into relax through the onnx importer.
tvm_model = from_onnx(model, opset=opset, keep_params_in_input=True)
# Convert operators for inference mode.
tvm_model = relax.transform.DecomposeOpsForInference()(tvm_model)
# Legalize any relax ops into tensorir.
tvm_model = relax.transform.LegalizeOps()(tvm_model)
# Separate model from parameters.
tvm_model, params = relax.frontend.detach_params(tvm_model)
# Compile the relax graph into a VM then run.
with tvm.transform.PassContext(opt_level=3):
ex = tvm.compile(tvm_model, target="llvm")
vm = relax.VirtualMachine(ex, tvm.cpu())
# Prepare inputs.
input_list = [
inputs[key.name_hint] for key in tvm_model["main"].params if key.name_hint in inputs
]
if params:
input_list += params["main"]
# Run model and check outputs.
vm.set_input("main", *input_list)
vm.invoke_stateful("main")
tvm_output = vm.get_outputs("main")
# Wrap as a list if there is only one output.
if len(ort_output) == 1:
# Do not check the output number for TVM
# As for sequence output, the TVM output is a Tuple
# while the ONNX output number is one, which is a list
tvm_output = [tvm_output]
def _get_numpy_subdtype(narray):
if np.issubdtype(narray.dtype, np.integer):
return "integer"
elif np.issubdtype(narray.dtype, np.floating):
return "floating"
elif np.issubdtype(narray.dtype, np.bool_):
return "bool"
elif np.issubdtype(narray.dtype, np.complexfloating):
return "complexfloating"
else:
return "other"
def _check_output(tvm_out, ort_out):
if isinstance(tvm_out, tuple) and isinstance(ort_out, (tvm.runtime.ShapeTuple, list)):
assert len(tvm_out) == len(ort_out), "Unequal number of outputs"
for tvm_out_i, ort_out_i in zip(tvm_out, ort_out):
_check_output(tvm_out_i, ort_out_i)
elif isinstance(tvm_out, tvm.runtime.Tensor) and isinstance(ort_out, np.ndarray):
if check_dtypes:
assert tvm_out.numpy().dtype == ort_out.dtype
tvm.testing.assert_allclose(tvm_out.numpy(), ort_out, rtol=rtol, atol=atol)
elif isinstance(tvm_out, tvm.runtime.ShapeTuple) and isinstance(ort_out, np.ndarray):
shape_out = tvm.runtime.tensor([int(i) for i in tvm_out])
if check_dtypes:
assert _get_numpy_subdtype(shape_out.numpy()) == _get_numpy_subdtype(ort_out)
tvm.testing.assert_allclose(shape_out.numpy(), ort_out, rtol=rtol, atol=atol)
elif isinstance(tvm_out, (int, float, bool)) and isinstance(ort_out, np.ndarray):
if check_dtypes:
assert _get_numpy_subdtype(np.array(tvm_out)) == _get_numpy_subdtype(ort_out)
tvm.testing.assert_allclose(np.array(tvm_out), ort_out, rtol=rtol, atol=atol)
else:
raise ValueError(f"Unsupported types: {type(tvm_out)}, {type(ort_out)}")
# Check that number of outputs match.
assert len(tvm_output) == len(ort_output), "Unequal number of outputs"
for tvm_out, ort_out in zip(tvm_output, ort_output):
# TODO Allow configurable tolerance.
if ort_out is not None:
_check_output(tvm_out, ort_out)
@pytest.mark.parametrize(
"input_names, expected_names",
[
([".", "123"], ["_", "input_123"]),
([".", "_"], ["_", "__1"]),
(["123", "input_123"], ["input_123", "input_123_1"]),
],
)
def test_sanitize(input_names, expected_names):
node = helper.make_node("Add", inputs=input_names, outputs=["output"])
graph = helper.make_graph(
[node],
"test",
inputs=[
helper.make_tensor_value_info(str(var), TensorProto.FLOAT, [32, 32])
for var in input_names
],
outputs=[
helper.make_tensor_value_info("output", TensorProto.FLOAT, [32, 32]),
],
)
model = helper.make_model(graph, producer_name="test_sanitizer")
tvm_model = from_onnx(model)
for i, param in enumerate(tvm_model["main"].params):
assert param.name_hint == expected_names[i]
def verify_unary(
op_name,
shape,
attrs={},
domain=None,
input_dtype=TensorProto.FLOAT,
output_dtype=TensorProto.FLOAT,
opset=14,
):
test_node = helper.make_node(op_name, ["x"], ["y"], **attrs, domain=domain)
graph = helper.make_graph(
[test_node],
"elemwise_test",
inputs=[
helper.make_tensor_value_info("x", input_dtype, shape),
],
outputs=[helper.make_tensor_value_info("y", output_dtype, shape)],
)
model = helper.make_model(graph, producer_name="elemwise_test")
check_correctness(model, opset=opset)
def verify_unary_dynamic_shape(
op_name,
shape,
shape_instance,
attrs={},
domain=None,
input_dtype=TensorProto.FLOAT,
output_dtype=TensorProto.FLOAT,
opset=14,
):
test_node = helper.make_node(op_name, ["x"], ["y"], **attrs, domain=domain)
graph = helper.make_graph(
[test_node],
"elemwise_test",
inputs=[
helper.make_tensor_value_info("x", input_dtype, shape),
],
outputs=[helper.make_tensor_value_info("y", output_dtype, shape)],
)
model = helper.make_model(graph, producer_name="elemwise_test")
inputs = {"x": generate_random_value(shape_instance, input_dtype)}
check_correctness(model, inputs, opset=opset)
def verify_binary(
op_name, shape_a, shape_b, shape_c, attrs={}, domain=None, dtype=TensorProto.FLOAT, opset=14
):
test_node = helper.make_node(op_name, ["a", "b"], ["c"], **attrs, domain=domain)
graph = helper.make_graph(
[test_node],
"binary_test",
inputs=[
helper.make_tensor_value_info("a", dtype, shape_a),
helper.make_tensor_value_info("b", dtype, shape_b),
],
outputs=[helper.make_tensor_value_info("c", dtype, shape_c)],
)
model = helper.make_model(graph, producer_name="binary_test")
check_correctness(model, opset=opset, check_dtypes=True)
def verify_binary_scalar(op_name, attrs={}, domain=None, dtype=TensorProto.INT32, opset=14):
a = make_constant_node("a", dtype, [], [4])
b = make_constant_node("b", dtype, [], [8])
test_node = helper.make_node(op_name, ["a", "b"], ["c"], **attrs, domain=domain)
graph = helper.make_graph(
[a, b, test_node],
"binary_test",
inputs=[],
outputs=[helper.make_tensor_value_info("c", dtype, ())],
)
model = helper.make_model(graph, producer_name="binary_test")
check_correctness(model, opset=opset, check_dtypes=True)
def verify_compare(op_name, shape, attrs={}, domain=None):
test_node = helper.make_node(op_name, ["a", "b"], ["c"], **attrs, domain=domain)
graph = helper.make_graph(
[test_node],
"compare_test",
inputs=[
helper.make_tensor_value_info("a", TensorProto.FLOAT, shape),
helper.make_tensor_value_info("b", TensorProto.FLOAT, shape),
],
outputs=[helper.make_tensor_value_info("c", TensorProto.BOOL, shape)],
)
model = helper.make_model(graph, producer_name="compare_test")
check_correctness(model)
def verify_ternary(op_name, shape_a, shape_b, shape_c, shape_d, attrs={}, domain=None):
test_node = helper.make_node(op_name, ["a", "b", "c"], ["d"], **attrs, domain=domain)
graph = helper.make_graph(
[test_node],
"ternary_test",
inputs=[
helper.make_tensor_value_info("a", TensorProto.FLOAT, shape_a),
helper.make_tensor_value_info("b", TensorProto.FLOAT, shape_b),
helper.make_tensor_value_info("c", TensorProto.FLOAT, shape_c),
],
outputs=[helper.make_tensor_value_info("d", TensorProto.FLOAT, shape_d)],
)
model = helper.make_model(graph, producer_name="ternary_test")
check_correctness(model)
@pytest.mark.parametrize("dynamic", [True, False])
def test_matmul(dynamic):
matmul_node = helper.make_node("MatMul", ["a", "b"], ["c"])
a_shape = [32, 48]
b_shape = [48, 64]
output_shape = [32, 64]
if dynamic:
a_shape = ["?", "?"]
graph = helper.make_graph(
[matmul_node],
"matmul_test",
inputs=[
helper.make_tensor_value_info("a", TensorProto.FLOAT, a_shape),
],
initializer=[
helper.make_tensor(
"b", TensorProto.FLOAT, b_shape, np.random.normal(size=b_shape).astype("float32")
)
],
outputs=[helper.make_tensor_value_info("c", TensorProto.FLOAT, output_shape)],
)
model = helper.make_model(graph, producer_name="matmul_test")
inputs = None
if dynamic:
inputs = {
"a": np.random.normal(size=[32, 48]).astype("float32"),
}
check_correctness(model, inputs)
def test_concat():
verify_binary("Concat", [1, 32], [1, 32], [2, 32], attrs={"axis": 0})
@pytest.mark.parametrize("op_name", ["Add", "Sub", "Mul", "Div", "Pow"])
def test_binary(op_name: str):
verify_binary(op_name, [1, 32], [1, 32], [1, 32])
verify_binary_scalar(op_name)
@pytest.mark.parametrize("int_mode", [True, False])
def test_mod(int_mode: bool):
if int_mode:
dtype, fmod = TensorProto.INT32, 0
else:
dtype, fmod = TensorProto.FLOAT, 1
verify_binary("Mod", [1, 32], [1, 32], [1, 32], attrs={"fmod": fmod}, dtype=dtype)
verify_binary_scalar("Mod", attrs={"fmod": fmod}, dtype=dtype)
@pytest.mark.parametrize("num_inputs", [1, 2, 4])
@pytest.mark.parametrize("op_name", ["Min", "Max", "Sum", "Mean"])
def test_multi_input(op_name: str, num_inputs: int):
input_shape = [32, 32]
input_var = ["i" + str(i) for i in range(num_inputs)]
input_values = [
helper.make_tensor_value_info(var, TensorProto.FLOAT, input_shape) for var in input_var
]
test_node = helper.make_node(op_name, input_var, ["c"])
graph = helper.make_graph(
[test_node],
"multi_input_test",
inputs=input_values,
outputs=[helper.make_tensor_value_info("c", TensorProto.FLOAT, input_shape)],
)
model = helper.make_model(graph, producer_name="multi_input_test")
check_correctness(model)
@pytest.mark.parametrize("op_name", ["Less", "LessOrEqual", "Greater", "GreaterOrEqual"])
def test_compare(op_name: str):
verify_compare(op_name, [1, 32])
@pytest.mark.parametrize("op_name", ["And", "Or", "Xor"])
def test_binary_bool(op_name: str):
verify_binary(op_name, [32, 32], [32, 32], [32, 32], dtype=TensorProto.BOOL)
@pytest.mark.parametrize("op_name", ["BitwiseAnd", "BitwiseOr", "BitwiseXor"])
def test_bitwise(op_name: str):
verify_binary(op_name, [32, 32], [32, 32], [32, 32], dtype=TensorProto.UINT64, opset=18)
def test_bitwise_not():
verify_unary(
"BitwiseNot",
[32, 32],
input_dtype=TensorProto.UINT64,
output_dtype=TensorProto.UINT64,
opset=18,
)
@pytest.mark.parametrize("direction", ["LEFT", "RIGHT"])
def test_bitwise_shift(direction: str):
shape = [32, 32]
dtype = TensorProto.UINT64
test_node = helper.make_node("BitShift", ["a", "b"], ["c"], direction=direction)
graph = helper.make_graph(
[test_node],
"binary_test",
inputs=[
helper.make_tensor_value_info("a", dtype, shape),
helper.make_tensor_value_info("b", dtype, shape),
],
outputs=[helper.make_tensor_value_info("c", dtype, shape)],
)
model = helper.make_model(graph, producer_name="binary_test")
check_correctness(model, inputs={"b": np.random.randint(0, 8, shape).astype("uint64")})
@pytest.mark.parametrize(
"op_name",
[
"Sin",
"Cos",
"Tan",
"Sinh",
"Cosh",
"Tanh",
# "Asin", // TODO @jikechao, fix the precision loss due to the Taylor approximation
# "Acos",
# "Atan",
"Asinh",
"Acosh",
"Atanh",
"Neg",
"Abs",
"Log",
"Exp",
"Not",
"Reciprocal",
"Floor",
"Ceil",
"Round",
"IsInf",
"IsNaN",
"Sqrt",
"Relu",
"Elu",
"HardSwish",
"Sign",
"Softplus",
"Softsign",
"Erf",
"Sigmoid",
"Softmax",
"LogSoftmax",
"Hardmax",
"Identity",
],
)
def test_unary(op_name: str):
input_dtype = TensorProto.FLOAT
if op_name in [
"IsNaN",
"IsInf",
]:
pytest.skip(f"Skipping test {op_name} because current LegalizeOps does not support it.")
elif op_name == "Not":
input_dtype = TensorProto.BOOL
output_dtype = TensorProto.BOOL
else:
output_dtype = TensorProto.FLOAT
verify_unary(op_name, [8, 8, 8], input_dtype=input_dtype, output_dtype=output_dtype)
@pytest.mark.parametrize("from_type", [TensorProto.INT32, TensorProto.FLOAT, TensorProto.FLOAT16])
@pytest.mark.parametrize("to_type", [TensorProto.INT32, TensorProto.FLOAT, TensorProto.FLOAT16])
def test_cast(from_type, to_type):
cast_node = helper.make_node("Cast", ["a"], ["a_float"], to=to_type)
graph = helper.make_graph(
[cast_node],
"cast_test",
inputs=[
helper.make_tensor_value_info("a", from_type, [1, 32]),
],
outputs=[helper.make_tensor_value_info("a_float", to_type, [1, 32])],
)
model = helper.make_model(graph, producer_name="cast_test")
check_correctness(model, opset=13)
def test_gather():
def _verify_gather(data_shape, indices, out_shape, axis=0):
gather_node = helper.make_node("Gather", ["data", "indices"], ["y"], axis=axis)
if isinstance(indices, (list, tuple)):
indices_shape = np.asarray(indices).shape
else:
indices_shape = []
graph = helper.make_graph(
[gather_node],
"gather_test",
inputs=[
helper.make_tensor_value_info("data", TensorProto.FLOAT, data_shape),
helper.make_tensor_value_info("indices", TensorProto.INT64, indices_shape),
],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, out_shape)],
)
model = helper.make_model(graph, producer_name="gather_test")
input_values = {
"data": np.random.randn(*data_shape).astype("float32"),
"indices": np.array(indices).astype("int64"),
}
check_correctness(model, inputs=input_values)
_verify_gather([5, 4, 3, 2], [0, 1, 3], [3, 4, 3, 2])
_verify_gather([3], 0, [])
_verify_gather([3, 3], [[0, 2]], [3, 1, 2], 1)
@pytest.mark.parametrize(
"data_shape, indices_shape, axis",
[
([3, 4, 5], [1, 4, 5], 0),
([3, 4, 5], [3, 2, 5], 1),
([3, 4, 5], [3, 4, 2], 2),
],
)
def test_gather_elements(data_shape, indices_shape, axis):
gather_elements_node = helper.make_node("GatherElements", ["data", "indices"], ["y"], axis=axis)
graph = helper.make_graph(
[gather_elements_node],
"gather_elements_test",
inputs=[
helper.make_tensor_value_info("data", TensorProto.FLOAT, data_shape),
helper.make_tensor_value_info("indices", TensorProto.INT64, indices_shape),
],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, indices_shape)],
)
model = helper.make_model(graph, producer_name="gather_elements_test")
input_values = {
"data": np.random.randn(*data_shape).astype("float32"),
"indices": np.random.randint(0, data_shape[axis], indices_shape).astype("int64"),
}
check_correctness(model, inputs=input_values)
@pytest.mark.parametrize(
"data_shape, indices_shape, batch_dims",
[
([2, 2], [2, 2], 0),
([2, 2], [2, 1], 0),
([2, 2, 2], [1], 0),
([2, 2, 2], [2, 2], 0),
([2, 2, 2], [2, 1, 2], 0),
([2, 2, 2], [2, 2], 1),
([2, 2, 2], [2, 1], 1),
],
)
def test_gather_nd(data_shape, indices_shape, batch_dims):
gather_nd_node = helper.make_node("GatherND", ["data", "indices"], ["y"], batch_dims=batch_dims)
graph = helper.make_graph(
[gather_nd_node],
"gather_nd_test",
inputs=[
helper.make_tensor_value_info("data", TensorProto.FLOAT, data_shape),
helper.make_tensor_value_info("indices", TensorProto.INT64, indices_shape),
],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, None)],
)
model = helper.make_model(graph, producer_name="gather_nd_test")
input_values = {
"data": np.random.randn(*data_shape).astype("float32"),
"indices": np.random.randint(0, 2, indices_shape).astype("int64"),
}
check_correctness(model, inputs=input_values)
@pytest.mark.parametrize("axis", [0, 1, 2])
@pytest.mark.parametrize(("name", "opset"), [("Scatter", 10), ("ScatterElements", 11)])
def test_scatter(axis: int, name: str, opset: int):
if axis != 1:
pytest.skip("The current topi impl is wrong, which only works for axis=1")
input_shape = [16, 16, 16]
indices_shape = [8, 8, 8]
updates_shape = [8, 8, 8]
output_shape = [16, 16, 16]
node = helper.make_node(name, ["data", "indices", "updates"], ["output"], axis=axis)
graph = helper.make_graph(
[node],
"scatter_test",
inputs=[
helper.make_tensor_value_info("data", TensorProto.FLOAT, input_shape),
helper.make_tensor_value_info("indices", TensorProto.INT64, indices_shape),
helper.make_tensor_value_info("updates", TensorProto.FLOAT, updates_shape),
],
outputs=[helper.make_tensor_value_info("output", TensorProto.FLOAT, output_shape)],
)
model = helper.make_model(graph, producer_name="scatter_test")
indices = np.random.randint(0, 16, indices_shape)
check_correctness(model, inputs={"indices": indices}, opset=opset)
@pytest.mark.parametrize("reduction", ["none", "add", "mul"])
def test_scatter_nd(reduction):
def verify_scatter_nd(data_shape, indices_shape, updates_shape):
scatter_nd_node = helper.make_node(
"ScatterND",
["data", "indices", "updates"],
["output"],
reduction=reduction,
)
graph = helper.make_graph(
[scatter_nd_node],
"scatter_nd_test",
inputs=[
helper.make_tensor_value_info("data", TensorProto.FLOAT, data_shape),
helper.make_tensor_value_info("indices", TensorProto.INT64, indices_shape),
helper.make_tensor_value_info("updates", TensorProto.FLOAT, updates_shape),
],
outputs=[helper.make_tensor_value_info("output", TensorProto.FLOAT, data_shape)],
)
model = helper.make_model(graph, producer_name="scatter_nd_test")
indices = np.random.choice(data_shape[0], indices_shape)
check_correctness(model, inputs={"indices": indices}, opset=16)
verify_scatter_nd([8], [4, 1], [4])
verify_scatter_nd([4, 4, 4], [2, 1], [2, 4, 4])
verify_scatter_nd([4, 5, 6], [2, 3, 2], [2, 3, 6])
verify_scatter_nd([10], [5, 1], [5])
@pytest.mark.parametrize("tensor_shape", [[32, 32]])
@pytest.mark.parametrize("condition_shape", [None, [8], [16]])
@pytest.mark.parametrize("axis", [None, 0, 1])
def test_compress(
tensor_shape: List[int],
condition_shape: Optional[List[int]],
axis: Optional[int],
):
if condition_shape is None and axis is None:
pytest.skip("Either condition_shape or axis must be specified")
if condition_shape is None:
condition_shape = [tensor_shape[axis]]
compress_node = helper.make_node("Compress", ["tensor", "condition"], ["output"], axis=axis)
graph = helper.make_graph(
[compress_node],
"compress_test",
inputs=[
helper.make_tensor_value_info("tensor", TensorProto.FLOAT, tensor_shape),
helper.make_tensor_value_info("condition", TensorProto.BOOL, condition_shape),
],
outputs=[
helper.make_tensor_value_info("output", TensorProto.FLOAT, [])
], # shape is unknown
)
model = helper.make_model(graph, producer_name="compress_test")
check_correctness(model, opset=11)
def test_size():
test_node = helper.make_node("Size", ["x"], ["y"])
graph = helper.make_graph(
[test_node],
"size_test",
inputs=[helper.make_tensor_value_info("x", TensorProto.FLOAT, [3, 3, 3])],
outputs=[helper.make_tensor_value_info("y", TensorProto.INT64, [3])],
)
model = helper.make_model(graph, producer_name="size_test")
check_correctness(model)
@pytest.mark.parametrize("k", [-1, 0, 1])
def test_eye_like(k: int):
verify_unary("EyeLike", [32, 32], attrs={"k": k})
@pytest.mark.parametrize("alpha", [None, 0.25, 1.0])
@pytest.mark.parametrize("beta", [None, 0.35, 1.0])
@pytest.mark.parametrize("useC", [False, True])
def test_gemm(alpha, beta, useC):
if useC:
gemm_node = helper.make_node(
"Gemm", ["a", "b", "c"], ["y"], alpha=alpha, beta=beta, transA=1, transB=1
)
else:
gemm_node = helper.make_node(
"Gemm", ["a", "b"], ["y"], alpha=alpha, beta=beta, transA=1, transB=1
)
inputs = [
helper.make_tensor_value_info("a", TensorProto.FLOAT, [4, 3]),
helper.make_tensor_value_info("b", TensorProto.FLOAT, [5, 4]),
]
if useC:
inputs.append(helper.make_tensor_value_info("c", TensorProto.FLOAT, [1, 5]))
graph = helper.make_graph(
[gemm_node],
"gemm_test",
inputs=inputs,
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, [3, 5])],
)
model = helper.make_model(graph, producer_name="gemm_test")
check_correctness(model)
@pytest.mark.parametrize(
"in_shape, shape, out_shape",
[
([7, 32, 32, 8], [224, 256], [224, 256]),
([7, 32, 32, 8], [-1, 8192], [7, 8192]),
([7, 32, 32, 8], [0, 32, 32, 8], [7, 32, 32, 8]),
],
)
def test_reshape(in_shape, shape, out_shape):
reshape_node = helper.make_node("Reshape", ["data", "shape"], ["reshaped"])
graph = helper.make_graph(
[reshape_node],
"reshape_test",
inputs=[
helper.make_tensor_value_info("data", TensorProto.FLOAT, in_shape),
],
initializer=[helper.make_tensor("shape", TensorProto.INT64, [len(shape)], shape)],
outputs=[helper.make_tensor_value_info("reshaped", TensorProto.FLOAT, out_shape)],
)
input_values = {
"data": np.random.randn(*in_shape).astype("float32"),
}
model = helper.make_model(graph, producer_name="reshape_test")
check_correctness(model, inputs=input_values)
def test_transpose():
verify_unary("Transpose", [32, 32, 32], attrs={"perm": [1, 2, 0]})
def test_unsqueeze():
unsqueeze_node = helper.make_node("Unsqueeze", ["a", "axes"], ["b"])
graph = helper.make_graph(
[unsqueeze_node],
"unsqueeze",
inputs=[helper.make_tensor_value_info("a", TensorProto.FLOAT, [32, 32])],
initializer=[helper.make_tensor("axes", TensorProto.INT64, [3], vals=[0, 2, 3])],
outputs=[helper.make_tensor_value_info("b", TensorProto.FLOAT, [1, 32, 1, 1, 32])],
)
model = helper.make_model(graph, producer_name="unsqueeze_test")
check_correctness(model)
def test_unsqueeze_v1():
# https://github.com/onnx/onnx/blob/main/docs/Changelog.md#Unsqueeze-1
unsqueeze_node = helper.make_node("Unsqueeze", ["a"], ["b"], axes=[0, 2, 3])
graph = helper.make_graph(
[unsqueeze_node],
"unsqueeze_v1",
inputs=[helper.make_tensor_value_info("a", TensorProto.FLOAT, [32, 32])],
outputs=[helper.make_tensor_value_info("b", TensorProto.FLOAT, [1, 32, 1, 1, 32])],
)
model = helper.make_model(
graph, producer_name="unsqueeze_v1_test", opset_imports=[helper.make_opsetid("", 6)]
)
check_correctness(model, opset=10)
def test_gelu():
verify_unary("Gelu", [32, 32], domain="com.microsoft")
def test_bias_gelu():
verify_binary("BiasGelu", [32, 32], [32], [32, 32], domain="com.microsoft")
def test_where():
where_node = helper.make_node("Where", ["a", "b", "c"], ["d"])
graph = helper.make_graph(
[where_node],
"where_test",
inputs=[
helper.make_tensor_value_info("a", TensorProto.BOOL, [32, 32]),
helper.make_tensor_value_info("b", TensorProto.FLOAT, [32, 32]),
helper.make_tensor_value_info("c", TensorProto.FLOAT, [32, 32]),
],
outputs=[helper.make_tensor_value_info("d", TensorProto.FLOAT, [32, 32])],
)
model = helper.make_model(graph, producer_name="where_test")
check_correctness(model)
@pytest.mark.parametrize("min", [True, False])
@pytest.mark.parametrize("max", [True, False])
def test_clip(min, max):
if min and max:
clip_node = helper.make_node("Clip", ["input", "min", "max"], ["output"])
elif min:
clip_node = helper.make_node("Clip", ["input", "min"], ["output"])
elif max:
clip_node = helper.make_node("Clip", ["input", "max"], ["output"])
else:
clip_node = helper.make_node("Clip", ["input"], ["output"])
inputs = [helper.make_tensor_value_info("input", TensorProto.FLOAT, [32, 64])]
if min:
inputs.append(helper.make_tensor_value_info("min", TensorProto.FLOAT, ()))
if max:
inputs.append(helper.make_tensor_value_info("max", TensorProto.FLOAT, ()))
graph = helper.make_graph(
[clip_node],
"clip_test",
inputs=inputs,
outputs=[helper.make_tensor_value_info("output", TensorProto.FLOAT, [32, 64])],
)
model = helper.make_model(graph, producer_name="clip_test")
check_correctness(model)
@pytest.mark.parametrize("min", [-6.0, 0.0])
@pytest.mark.parametrize("max", [6.0])
def test_clip_v6(max, min):
# https://github.com/onnx/onnx/blob/main/docs/Changelog.md#Clip-6
clip_node = helper.make_node("Clip", ["input"], ["output"], max=max, min=min)
inputs = [helper.make_tensor_value_info("input", TensorProto.FLOAT, [32, 64])]
graph = helper.make_graph(
[clip_node],
"clip_v6_test",
inputs=inputs,
outputs=[helper.make_tensor_value_info("output", TensorProto.FLOAT, [32, 64])],
)
model = helper.make_model(
graph, producer_name="clip_v6_test", opset_imports=[helper.make_opsetid("", 6)]
)
check_correctness(model, opset=10)
def test_equal():
equal_node = helper.make_node("Equal", ["a", "b"], ["output"])
graph = helper.make_graph(
[equal_node],
"equal_test",
inputs=[
helper.make_tensor_value_info("a", TensorProto.FLOAT, [32, 32]),
helper.make_tensor_value_info("b", TensorProto.FLOAT, [32, 32]),
],
outputs=[helper.make_tensor_value_info("output", TensorProto.BOOL, [32, 32])],
)
model = helper.make_model(graph, producer_name="equal_test")
check_correctness(
model, {"a": np.zeros([32, 32], dtype="float32"), "b": np.zeros([32, 32], dtype="float32")}
)
check_correctness(
model, {"a": np.ones([32, 32], dtype="float32"), "b": np.zeros([32, 32], dtype="float32")}
)
check_correctness(model)
def test_shape():
shape_node = helper.make_node("Shape", ["data"], ["output"])
graph = helper.make_graph(
[shape_node],
"shape_test",
inputs=[
helper.make_tensor_value_info("data", TensorProto.FLOAT, [3, 4, 5, 6]),
],
outputs=[helper.make_tensor_value_info("output", TensorProto.INT64, [4])],
)
model = helper.make_model(graph, producer_name="shape_test")
check_correctness(model)
@pytest.mark.parametrize("upper", [True, False])
def test_trilu(upper: bool):
verify_unary("Trilu", [3, 5, 5], attrs={"upper": upper})
@pytest.mark.parametrize("k_value", [-1, 0, 1])
def test_trilu_with_const_k(k_value: int):
"""test_trilu_with_const_k"""
input_shape = [2, 3, 3]
graph = helper.make_graph(
[
make_constant_node("k", onnx.TensorProto.INT64, [1], [k_value]),
helper.make_node("Trilu", inputs=["x", "k"], outputs=["y"]),
],
"trilu_graph",
inputs=[
helper.make_tensor_value_info("x", onnx.TensorProto.DOUBLE, input_shape),
],
outputs=[helper.make_tensor_value_info("y", onnx.TensorProto.DOUBLE, input_shape)],
)
model = helper.make_model(graph, producer_name="trilu_graph")
check_correctness(model)
def test_selu():
verify_unary("Selu", [3, 32, 32])
verify_unary("Selu", [3, 32, 32], attrs={"alpha": 0.25, "gamma": 0.3})
def test_mish():
verify_unary("Mish", [3, 32, 32], opset=18)
def test_prelu():
verify_binary("PRelu", [3, 32, 32], [1], [3, 32, 32])
def test_thresholded_relu():
verify_unary("ThresholdedRelu", [3, 32, 32])
verify_unary("ThresholdedRelu", [3, 32, 32], attrs={"alpha": -0.01})
def test_leakyrelu():
verify_unary("LeakyRelu", [32, 32])
verify_unary("LeakyRelu", [32, 32], attrs={"alpha": 0.2})
def test_hardsigmoid():
verify_unary("HardSigmoid", [32, 32])
verify_unary("HardSigmoid", [32, 32], attrs={"alpha": 0.3, "beta": 0.4})
verify_unary("HardSigmoid", [1, 3, 20, 20], attrs={"alpha": 0.5, "beta": 0.6})
def test_shrink():
verify_unary("Shrink", [32, 32])
verify_unary("Shrink", [32, 32], attrs={"lambd": 0.2, "bias": 0.1})
@pytest.mark.parametrize("stride", [1, 2])
@pytest.mark.parametrize("dilation", [1, 2])
@pytest.mark.parametrize("bias", [True, False])
@pytest.mark.parametrize("pad", [0, 2])
@pytest.mark.parametrize("auto_pad", ["SAME_UPPER", "SAME_LOWER", "VALID"])
def test_conv(stride: int, dilation: int, pad: int, bias: bool, auto_pad: str):
def _verify_conv(input_shape, weight_shape):
nd = len(weight_shape) - 2
if auto_pad == "VALID":
output_shape = [input_shape[0], weight_shape[0]] + [
(input_shape[i] - dilation * (weight_shape[i] - 1) - 1) // stride + 1
for i in range(2, len(input_shape))
]
bias_shape = [output_shape[1]]
conv_node = helper.make_node(
"Conv",
inputs=["x", "w"] + (["b"] if bias else []),
outputs=["y"],
strides=[stride] * nd,
dilations=[dilation] * nd,
auto_pad=auto_pad,
group=input_shape[1] // weight_shape[1],
)
elif auto_pad in ("SAME_UPPER", "SAME_LOWER"):
if dilation == 2:
# auto_pad = "SAME" and dilation = 2 is not supported in ONNX
return
output_shape = [input_shape[0], weight_shape[0]] + [
(input_shape[i] + stride - 1) // stride for i in range(2, len(input_shape))
]
bias_shape = [output_shape[1]]
conv_node = helper.make_node(
"Conv",
inputs=["x", "w"] + (["b"] if bias else []),
outputs=["y"],
strides=[stride] * nd,
dilations=[dilation] * nd,
auto_pad=auto_pad,
group=input_shape[1] // weight_shape[1],
)
else:
output_shape = [input_shape[0], weight_shape[0]] + [
(input_shape[i] + 2 * pad - dilation * (weight_shape[i] - 1) - 1) // stride + 1
for i in range(2, len(input_shape))
]
bias_shape = [output_shape[1]]
conv_node = helper.make_node(
"Conv",
inputs=["x", "w"] + (["b"] if bias else []),
outputs=["y"],
strides=[stride] * nd,
dilations=[dilation] * nd,
pads=[pad] * nd * 2,
group=input_shape[1] // weight_shape[1],
)
graph = helper.make_graph(
[conv_node],
"conv_test",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, input_shape),
helper.make_tensor_value_info("w", TensorProto.FLOAT, weight_shape),
]
+ ([helper.make_tensor_value_info("b", TensorProto.FLOAT, bias_shape)] if bias else []),
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, output_shape)],
)
model = helper.make_model(graph, producer_name="conv_test")
check_correctness(model, atol=1e-4)
# Conv1D
_verify_conv([3, 4, 32], [4, 4, 3])
_verify_conv([3, 4, 32], [2, 4, 3]) # group=2
# Conv2D
_verify_conv([3, 4, 32, 32], [4, 4, 3, 3])
_verify_conv([3, 4, 32, 32], [2, 4, 3, 3]) # group=2
# Conv3D
_verify_conv([3, 4, 32, 32, 32], [4, 4, 3, 3, 3])
_verify_conv([3, 4, 32, 32, 32], [2, 4, 3, 3, 3]) # group=2
@pytest.mark.parametrize("stride", [1, 2])
@pytest.mark.parametrize("dilation", [1])
@pytest.mark.parametrize("bias", [True, False])
@pytest.mark.parametrize("pad", [0, 2])
def test_conv_transpose(stride: int, dilation: int, pad: int, bias: bool):
def _verify_conv_transpose(input_shape, weight_shape):
nd = len(weight_shape) - 2
output_shape = [input_shape[0], weight_shape[0]] + [
(input_shape[i] - 1) * stride - 2 * pad + dilation * (weight_shape[i] - 1) + 1
for i in range(2, len(input_shape))
]
bias_shape = [output_shape[1]]
conv_node = helper.make_node(
"ConvTranspose",
inputs=["x", "w"] + (["b"] if bias else []),
outputs=["y"],
strides=[stride] * nd,
dilations=[dilation] * nd,
pads=[pad] * nd * 2,
group=input_shape[1] // weight_shape[1],
)
graph = helper.make_graph(
[conv_node],
"conv_transpose_test",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, input_shape),
helper.make_tensor_value_info("w", TensorProto.FLOAT, weight_shape),
]
+ ([helper.make_tensor_value_info("b", TensorProto.FLOAT, bias_shape)] if bias else []),
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, output_shape)],
)
model = helper.make_model(graph, producer_name="conv_transpose_test")
check_correctness(model, atol=1e-4)
# ConvTranspose1D
_verify_conv_transpose([3, 4, 32], [4, 4, 3])
_verify_conv_transpose([3, 4, 32], [4, 2, 3]) # group=2
# ConvTranspose2D
_verify_conv_transpose([3, 4, 32, 32], [4, 4, 3, 3])
_verify_conv_transpose([3, 4, 32, 32], [4, 2, 3, 3]) # group=2
def test_pow():
verify_binary("Pow", [32, 32], [32, 32], [32, 32])
@pytest.mark.parametrize("reverse", [True, False])
@pytest.mark.parametrize("exclusive", [False])
def test_cumsum(reverse, exclusive):
cumsum_node = helper.make_node(
"CumSum", ["x", "axis"], ["y"], reverse=reverse, exclusive=exclusive
)
shape = [32, 32]
graph = helper.make_graph(
[cumsum_node],
"cumsum_test",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, shape),
],
initializer=[helper.make_tensor("axis", TensorProto.INT64, (), [1])],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, shape)],
)
model = helper.make_model(graph, producer_name="cumsum_test")
check_correctness(model)
def test_cumsum1():
"""test_cumsum1"""
input_shape = [2, 3]
graph = helper.make_graph(
[
helper.make_node("CumSum", inputs=["X", "axis"], outputs=["Y"]),
],
"cumsum_graph",
inputs=[
helper.make_tensor_value_info("X", onnx.TensorProto.DOUBLE, input_shape),
helper.make_tensor_value_info("axis", onnx.TensorProto.INT32, [1], "axis"),
],
outputs=[helper.make_tensor_value_info("Y", onnx.TensorProto.DOUBLE, input_shape)],
)
model = helper.make_model(graph, producer_name="cumsum_graph")
check_correctness(model, inputs={"axis": np.array([0], dtype=np.int32)})
@pytest.mark.parametrize("axis", [[0, 2], None])
def test_squeeze(axis):
if axis:
squeeze_node = helper.make_node("Squeeze", ["x", "axes"], ["y"])
else:
squeeze_node = helper.make_node("Squeeze", ["x"], ["y"])
shape = [1, 32, 1, 32]
initializer = (
[helper.make_tensor("axes", TensorProto.INT64, [len(axis)], axis)] if axis else None
)
graph = helper.make_graph(
[squeeze_node],
"squeeze_test",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, shape),
],
initializer=initializer,
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, [32, 32])],
)
model = helper.make_model(graph, producer_name="squeeze_test")
check_correctness(model, opset=13)
@pytest.mark.parametrize("axis", [[0, 2], None])
def test_squeeze_constant(axis):
shape = [1, 32, 1, 32]
constant = make_constant_node(
"x", onnx.TensorProto.FLOAT, shape, rg.standard_normal(size=shape).astype("float32")
)
if axis:
squeeze_node = helper.make_node("Squeeze", ["x", "axes"], ["y"])
else:
squeeze_node = helper.make_node("Squeeze", ["x"], ["y"])
initializer = (
[helper.make_tensor("axes", TensorProto.INT64, [len(axis)], axis)] if axis else None
)
graph = helper.make_graph(
[constant, squeeze_node],
"squeeze_test",
inputs=[],
initializer=initializer,
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, [32, 32])],
)
model = helper.make_model(graph, producer_name="squeeze_test")
check_correctness(model, opset=13)
@pytest.mark.parametrize("axis", [[0]])
@pytest.mark.parametrize("A", [8, 16, 32])
@pytest.mark.parametrize("B", [8, 16, 32])
def test_dynamic_squeeze(axis, A, B):
squeeze_node = helper.make_node("Squeeze", ["x", "axes"], ["y"])
shape = [1, "A", "B"]
initializer = (
[helper.make_tensor("axes", TensorProto.INT64, [len(axis)], axis)] if axis else None
)
graph = helper.make_graph(
[squeeze_node],
"squeeze_test",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, shape),
],
initializer=initializer,
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, ["A", "B"])],
)
model = helper.make_model(graph, producer_name="squeeze_test")
inputs = {"x": rg.standard_normal(size=[1, A, B]).astype("float32")}
check_correctness(model, inputs, opset=13)
@pytest.mark.parametrize("axis", [[0]])
@pytest.mark.parametrize("A", [8, 16, 32])
def test_dynamic_shape_squeeze(axis, A):
shape_node = helper.make_node("Shape", ["x"], ["y"])
squeeze_node = helper.make_node("Squeeze", ["y", "axes"], ["z"])
shape = ["A"]
initializer = (
[helper.make_tensor("axes", TensorProto.INT64, [len(axis)], axis)] if axis else None
)
graph = helper.make_graph(
[shape_node, squeeze_node],
"squeeze_test",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, shape),
],
initializer=initializer,
outputs=[helper.make_tensor_value_info("z", TensorProto.INT64, [])],
)
model = helper.make_model(graph, producer_name="squeeze_test")
inputs = {"x": rg.standard_normal(size=[A]).astype("float32")}
check_correctness(model, inputs, opset=13)
def test_const():
shape = [32, 32]
const_node = helper.make_node(
"Constant",
[],
["y"],
value=helper.make_tensor(
"value", TensorProto.FLOAT, shape, np.random.rand(*shape).astype(np.float32).flatten()
),
)
graph = helper.make_graph(
[const_node],
"const_test",
inputs=[],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, shape)],
)
model = helper.make_model(graph, producer_name="const_test")
check_correctness(model)
def test_instance_norm():
verify_ternary(
"InstanceNormalization", [1, 3, 32, 32], [3], [3], [1, 3, 32, 32], attrs={"epsilon": 1e-12}
)
verify_ternary(
"InstanceNormalization", [1, 32, 32], [32], [32], [1, 32, 32], attrs={"epsilon": 1e-12}
)
def test_mean_variance_norm():
verify_unary("MeanVarianceNormalization", [1, 3, 32, 32])
verify_unary("MeanVarianceNormalization", [1, 3, 32, 32], attrs={"axes": (1, 2, 3)})
def test_layer_norm():
layer_norm_node = helper.make_node(
"LayerNormalization", ["input", "scale", "bias"], ["Y"], epsilon=1e-12
)
graph = helper.make_graph(
[layer_norm_node],
"layer_norm_test",
inputs=[
helper.make_tensor_value_info("input", TensorProto.FLOAT, [32, 32]),
helper.make_tensor_value_info("scale", TensorProto.FLOAT, [32]),
helper.make_tensor_value_info("bias", TensorProto.FLOAT, [32]),
],
outputs=[
helper.make_tensor_value_info("Y", TensorProto.FLOAT, [32, 32]),
],
)
model = helper.make_model(graph, producer_name="layer_norm_test")
check_correctness(model)
# Test case with no bias that is an optional input
layer_norm_node = helper.make_node(
"LayerNormalization", ["input", "scale"], ["Y"], epsilon=1e-12
)
graph = helper.make_graph(
[layer_norm_node],
"layer_norm_test",
inputs=[
helper.make_tensor_value_info("input", TensorProto.FLOAT, [32, 32]),
helper.make_tensor_value_info("scale", TensorProto.FLOAT, [32]),
],
outputs=[
helper.make_tensor_value_info("Y", TensorProto.FLOAT, [32, 32]),
],
)
model = helper.make_model(graph, producer_name="layer_norm_test")
check_correctness(model)
def test_layer_norm_with_nd_gamma_beta():
layer_norm_node = helper.make_node(
"LayerNormalization", ["input", "scale", "bias"], ["Y"], axis=1, epsilon=1e-12
)
graph = helper.make_graph(
[layer_norm_node],
"layer_norm_with_nd_gamma_beta_test",
inputs=[
helper.make_tensor_value_info("input", TensorProto.FLOAT, [1, 3, 4, 4]),
helper.make_tensor_value_info("scale", TensorProto.FLOAT, [3, 4, 4]),
helper.make_tensor_value_info("bias", TensorProto.FLOAT, [3, 4, 4]),
],
outputs=[
helper.make_tensor_value_info("Y", TensorProto.FLOAT, [1, 3, 4, 4]),
],
)
model = helper.make_model(graph, producer_name="layer_norm_with_nd_gamma_beta_test")
check_correctness(model)
# Test case with no bias that is an optional input
layer_norm_node = helper.make_node(
"LayerNormalization", ["input", "scale"], ["Y"], axis=1, epsilon=1e-12
)
graph = helper.make_graph(
[layer_norm_node],
"layer_norm_with_nd_gamma_beta_test",
inputs=[
helper.make_tensor_value_info("input", TensorProto.FLOAT, [32, 32]),
helper.make_tensor_value_info("scale", TensorProto.FLOAT, [32]),
],
outputs=[
helper.make_tensor_value_info("Y", TensorProto.FLOAT, [32, 32]),
],
)
model = helper.make_model(graph, producer_name="layer_norm_with_nd_gamma_beta_test")
check_correctness(model)
# TODO Enable dynamism
@pytest.mark.parametrize("dynamic", [False])
def test_skiplayernormalization(dynamic):
def verify_skiplayernormalization(input_, skip, gamma, beta, bias):
node = onnx.helper.make_node(
"SkipLayerNormalization",
inputs=["input", "skip", "gamma", "beta", "bias"],
outputs=["output", "mean", "std_dev"],
domain="com.microsoft",
)
node.attribute.append(onnx.helper.make_attribute("epsilon", 1e-4))
input_shape = list(input_.shape)
skip_shape = list(skip.shape)
gamma_shape = list(gamma.shape)
beta_shape = list(beta.shape)
bias_shape = list(bias.shape)
output_shape = list(input_.shape)
mean_shape = list([1])
std_dev_shape = list([1])
if dynamic:
input_shape = ["?" for _ in range(len(input_.shape))]
skip_shape = ["?" for _ in range(len(skip.shape))]
gamma_shape = ["?" for _ in range(len(gamma.shape))]
beta_shape = ["?" for _ in range(len(beta.shape))]
bias_shape = ["?" for _ in range(len(bias.shape))]
output_shape = ["?" for _ in range(len(input_.shape))]
graph = helper.make_graph(
[node],
"skiplayernormalization_test",
inputs=[
helper.make_tensor_value_info("input", TensorProto.FLOAT, input_shape),
helper.make_tensor_value_info("skip", TensorProto.FLOAT, skip_shape),
helper.make_tensor_value_info("gamma", TensorProto.FLOAT, gamma_shape),
helper.make_tensor_value_info("beta", TensorProto.FLOAT, beta_shape),
helper.make_tensor_value_info("bias", TensorProto.FLOAT, bias_shape),
],
outputs=[
helper.make_tensor_value_info("output", TensorProto.FLOAT, output_shape),
helper.make_tensor_value_info("mean", TensorProto.FLOAT, mean_shape),
helper.make_tensor_value_info("std_dev", TensorProto.FLOAT, std_dev_shape),
],
)
model = helper.make_model(graph, producer_name="skiplayernormalization_test")
check_correctness(
model,
inputs={"input": input_, "skip": skip, "gamma": gamma, "beta": beta, "bias": bias},
)
hidden_size = 384
batch_size = 4
sequence_length = 4
dtype = "float32"
input_array = np.random.random((batch_size, sequence_length, hidden_size)).astype(dtype)
skip = np.random.random((batch_size, sequence_length, hidden_size)).astype(dtype)
gamma = np.random.uniform(0.5, 0.7, hidden_size).astype(dtype)
beta = np.random.randn(hidden_size).astype(dtype) * 0.1
bias = np.random.randn(hidden_size).astype(dtype)
verify_skiplayernormalization(input_array, skip, gamma, beta, bias)
def test_embedlayernormalization():
def verify_embedlayernormalization(
input_ids,
segment_ids,
word_embedding,
position_embedding,
segment_embedding,
gamma,
beta,
):
node = onnx.helper.make_node(
"EmbedLayerNormalization",
inputs=[
"input_ids",
"" if segment_ids is None else "segment_ids",
"word_embedding",
"position_embedding",
"" if segment_embedding is None else "segment_embedding",
"gamma",
"beta",
],
outputs=["output", "mask_index"],
domain="com.microsoft",
)
node.attribute.append(onnx.helper.make_attribute("epsilon", 1e-4))
segment_ids_shape = [] if segment_ids is None else segment_ids.shape
segment_embedding_shape = [] if segment_embedding is None else segment_embedding.shape
graph = helper.make_graph(
[node],
"embedlayernormalization_test",
inputs=[
helper.make_tensor_value_info(
"input_ids", TensorProto.INT32, list(input_ids.shape)
),
helper.make_tensor_value_info("segment_ids", TensorProto.INT32, segment_ids_shape),
helper.make_tensor_value_info(
"word_embedding", TensorProto.FLOAT, list(word_embedding.shape)
),
helper.make_tensor_value_info(
"position_embedding", TensorProto.FLOAT, list(position_embedding.shape)
),
helper.make_tensor_value_info(
"segment_embedding", TensorProto.FLOAT, segment_embedding_shape
),
helper.make_tensor_value_info("gamma", TensorProto.FLOAT, list(gamma.shape)),
helper.make_tensor_value_info("beta", TensorProto.FLOAT, list(beta.shape)),
],
outputs=[
helper.make_tensor_value_info(
"output", TensorProto.FLOAT, list((batch_size, sequence_length, hidden_size))
),
helper.make_tensor_value_info("mask_index", TensorProto.INT32, [batch_size]),
],
)
model = helper.make_model(graph, producer_name="embedlayernormalization_test")
inputs = {
"input_ids": input_ids,
"segment_ids": segment_ids,
"word_embedding": word_embedding,
"position_embedding": position_embedding,
"segment_embedding": segment_embedding,
"gamma": gamma,
"beta": beta,
}
check_correctness(model, inputs=inputs)
# TODO(@anwang2009): onnxruntime v1.9.0 requires empty list for optional argument,
# but v1.10.0+ requires None instead.
# verify_with_ort_with_inputs(
# model,
# [
# input_ids,
# np.empty(0, dtype="int32") if segment_ids is None else segment_ids,
# word_embedding,
# position_embedding,
# np.empty(0, dtype="float32") if segment_embedding is None else segment_embedding,
# gamma,
# beta,
# ],
# [
# (batch_size, sequence_length, hidden_size),
# batch_size,
# ],
# target=target,
# dev=dev,
# rtol=1e-4,
# atol=1e-4,
# )
hidden_size = 384
batch_size = 4
sequence_length = 3
vocab_size = 5
input_ids = np.full((batch_size, sequence_length), 3).astype("int32")
segment_ids = np.zeros((batch_size, sequence_length)).astype("int32")
word_embedding = np.full((vocab_size, hidden_size), 1).astype("float32")
position_embedding = np.full((sequence_length, hidden_size), 2).astype("float32")
segment_embedding = np.full((vocab_size, hidden_size), 3).astype("float32")
gamma = np.random.uniform(0.5, 0.7, hidden_size).astype("float32")
beta = np.random.randn(hidden_size).astype("float32") * 0.1
verify_embedlayernormalization(
input_ids, segment_ids, word_embedding, position_embedding, segment_embedding, gamma, beta
)
# Test with undefined segment embedding
verify_embedlayernormalization(
input_ids, None, word_embedding, position_embedding, None, gamma, beta
)
def create_reduce_test_parameters_axes_attr():
output = []
for value in [True, False]:
output.append(("ReduceMax", value, 11))
output.append(("ReduceMean", value, 13))
output.append(("ReduceMin", value, 11))
output.append(("ReduceProd", value, 13))
output.append(("ReduceSum", value, 11))
output.append(("ReduceSumSquare", value, 13))
output.append(("ReduceLogSum", value, 13))
output.append(("ReduceLogSumExp", value, 13))
output.append(("ReduceL1", value, 13))
output.append(("ReduceL2", value, 13))
return output
@pytest.mark.parametrize("func, dynamic, opset", create_reduce_test_parameters_axes_attr())
def test_all_reduce_funcs_axes_attr(func, dynamic, opset):
def verify_reduce_func(func, data, axis, keepdims):
inshape = data.shape
outshape = np.sum(data, axis=axis, keepdims=keepdims == 1).shape
if axis:
node = onnx.helper.make_node(
func, inputs=["x"], outputs=["y"], axes=axis, keepdims=keepdims
)
else:
node = onnx.helper.make_node(func, inputs=["x"], outputs=["y"], keepdims=keepdims)
if dynamic:
in_list = ["?" for _ in range(len(inshape))]
out_list = ["?" for _ in range(len(outshape))]
else:
in_list = list(inshape)
out_list = list(outshape)
graph = helper.make_graph(
[node],
"reduce_test",
inputs=[helper.make_tensor_value_info("x", TensorProto.FLOAT, in_list)],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, out_list)],
)
model = helper.make_model(graph, producer_name="reduce_test")
inputs_dict = {"x": data}
# Reduction ops accumulate arithmetic errors, so we use a higher tolerance.
check_correctness(model, inputs_dict, opset=opset, rtol=1e-4, atol=1e-4)
for keepdims in [True, False]:
verify_reduce_func(
func, np.random.randn(3, 2, 2).astype(np.float32), axis=None, keepdims=keepdims
)
verify_reduce_func(
func, np.random.randn(3, 2, 3).astype(np.float32), axis=None, keepdims=keepdims
)
verify_reduce_func(
func, np.random.randn(3, 3, 3).astype(np.float32), axis=(1,), keepdims=keepdims
)
verify_reduce_func(
func, np.random.randn(3, 3, 3, 1).astype(np.float32), axis=(1, 2), keepdims=keepdims
)
verify_reduce_func(
func, np.random.randn(3, 3, 3, 1).astype(np.float32), axis=(1,), keepdims=keepdims
)
verify_reduce_func(
func, np.random.randn(1, 3, 4, 1).astype(np.float32), axis=(1,), keepdims=keepdims
)
def create_reduce_test_parameters_axes_input():
output = []
for dynamic in [True, False]:
output.append(("ReduceMax", dynamic, 18))
output.append(("ReduceMean", dynamic, 18))
output.append(("ReduceMin", dynamic, 18))
output.append(("ReduceProd", dynamic, 18))
output.append(("ReduceSum", dynamic, 13))
output.append(("ReduceSumSquare", dynamic, 18))
output.append(("ReduceLogSum", dynamic, 18))
output.append(("ReduceLogSumExp", dynamic, 18))
output.append(("ReduceL1", dynamic, 18))
output.append(("ReduceL2", dynamic, 18))
return output
@pytest.mark.parametrize("func, dynamic, opset", create_reduce_test_parameters_axes_input())
def test_all_reduce_funcs_axes_input(func, dynamic, opset):
def verify_reduce_func(func, data, axes, keepdims, noop_with_empty_axes=False):
inshape = data.shape
inputs = ["x"]
initializers = []
# Optional `axes` input
if axes is not None:
axes_name = "reduce_axes"
axes_np = np.asarray(axes, dtype=np.int64)
axes_init = helper.make_tensor(
name=axes_name,
data_type=TensorProto.INT64,
dims=axes_np.shape,
vals=axes_np,
)
initializers.append(axes_init)
inputs.append(axes_name)
# Determine input and output shapes
if not axes and not noop_with_empty_axes:
outshape = np.sum(data, axis=None, keepdims=keepdims).shape
elif not axes and noop_with_empty_axes:
outshape = inshape
else:
outshape = np.sum(data, axis=axes, keepdims=keepdims).shape
if dynamic:
in_list = ["?"] * len(inshape)
out_list = ["?"] * len(outshape)
else:
in_list = list(inshape)
out_list = list(outshape)
# Make a model node
node = helper.make_node(
func,
inputs=inputs,
outputs=["y"],
keepdims=keepdims,
noop_with_empty_axes=noop_with_empty_axes,
)
# Make a model graph and a model
graph = helper.make_graph(
[node],
"reduce18_test",
inputs=[helper.make_tensor_value_info("x", TensorProto.FLOAT, in_list)],
initializer=initializers,
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, out_list)],
)
model = helper.make_model(graph, producer_name="reduce18_test")
# Run TVM importer vs onnxruntime
inputs_dict = {"x": data}
check_correctness(model, inputs_dict, opset=opset, rtol=1e-4, atol=1e-4)
# Verify
for keepdims in [True, False]:
# no `axes` input && `noop_with_empty_axes` = 0 -> reduce over all dimensions.
verify_reduce_func(
func,
np.random.randn(3, 2, 2).astype(np.float32),
axes=[],
keepdims=keepdims,
noop_with_empty_axes=False,
)
# no `axes` input && `noop_with_empty_axes` = 0 -> reduce over all dimensions.
verify_reduce_func(
func,
np.random.randn(3, 2, 2).astype(np.float32),
axes=None,
keepdims=keepdims,
noop_with_empty_axes=False,
)
# no `axes` input && `noop_with_empty_axes` = 1 -> return the input unchanged.
verify_reduce_func(
func,
np.random.randn(4, 3).astype(np.float32),
axes=[],
keepdims=keepdims,
noop_with_empty_axes=True,
)
# no `axes` input && `noop_with_empty_axes` = 1 -> return the input unchanged.
# (onnxruntime bug) Runtime error on the onnxruntime part
# verify_reduce_func(
# func,
# np.random.randn(4, 3).astype(np.float32),
# axes=None,
# keepdims=keepdims,
# noop_with_empty_axes=True,
# )
# `axes` provided -> reduce over specified axes.
verify_reduce_func(
func,
np.random.randn(3, 3, 3, 1).astype(np.float32),
axes=(1, 2),
keepdims=keepdims,
)
@pytest.mark.parametrize("in_dtype", [np.float32, np.int32])
@pytest.mark.parametrize("axis", [None, 0, 1, 2])
@pytest.mark.parametrize("keepdims", [None, True, False])
def test_arg_min_max(in_dtype, axis, keepdims):
def verify_arg_min_max(input_dim, in_dtype, op_name="ArgMax", axis=None, keepdims=None):
a_np1 = np.random.uniform(-10, 10, input_dim).astype(in_dtype)
out_shape = list(a_np1.shape)
def_axis = axis if axis is not None else 0
if keepdims == 1 or keepdims is None:
out_shape[def_axis] = 1
else:
out_shape.pop(def_axis)
node = helper.make_node(op_name, inputs=["a_np1"], outputs=["out"])
if keepdims is not None:
keepdims_attr = helper.make_attribute("keepdims", keepdims)
node.attribute.append(keepdims_attr)
if axis is not None:
axis_attr = helper.make_attribute("axis", axis)
node.attribute.append(axis_attr)
graph = helper.make_graph(
[node],
"argreduce_test",
inputs=[helper.make_tensor_value_info("a_np1", TensorProto.INT32, list(a_np1.shape))],
outputs=[helper.make_tensor_value_info("out", TensorProto.INT64, list(out_shape))],
)
model = helper.make_model(graph, producer_name="arg_min_max_test")
check_correctness(model)
verify_arg_min_max([3, 4, 4], in_dtype, "ArgMax", axis, keepdims)
verify_arg_min_max([3, 4, 4], in_dtype, "ArgMin", axis, keepdims)
@pytest.mark.parametrize("axis", [-1, 0, 1])
@pytest.mark.parametrize("largest", [True, False])
def test_topk(axis: int, largest: int):
in_shape = [32, 32, 32]
k_value = 4
out_shape = in_shape
out_shape[axis] = k_value
k = make_constant_node("k", TensorProto.INT64, [1], [k_value])
node = onnx.helper.make_node(
"TopK",
inputs=["data", "k"],
outputs=["values", "indices"],
axis=axis,
largest=largest,
)
graph = helper.make_graph(
[k, node],
"topk_test",
inputs=[helper.make_tensor_value_info("data", TensorProto.FLOAT, in_shape)],
outputs=[
helper.make_tensor_value_info("values", TensorProto.FLOAT, out_shape),
helper.make_tensor_value_info("indices", TensorProto.INT64, out_shape),
],
)
model = helper.make_model(graph, producer_name="topk_test")
check_correctness(model)
@pytest.mark.parametrize("dynamic", [False, True])
def test_expand(dynamic):
def _test_expand(name, data, shape, ref_data):
shape_array = np.array(shape)
shape_node = onnx.helper.make_node(
"Constant",
inputs=[],
outputs=["shape"],
value=onnx.helper.make_tensor(
name="const_tensor",
data_type=onnx.TensorProto.INT64,
dims=shape_array.shape,
vals=shape_array.flatten().astype("int64"),
),
)
expand_node = helper.make_node("Expand", ["in", "shape"], ["out"])
in_shape = list(data.shape)
out_shape = list(ref_data.shape)
if dynamic:
in_shape = ["?" for _ in range(len(in_shape))]
out_shape = ["?" for _ in range(len(out_shape))]
graph = helper.make_graph(
[shape_node, expand_node],
"expand_teint64st",
inputs=[helper.make_tensor_value_info("in", TensorProto.FLOAT, in_shape)],
outputs=[helper.make_tensor_value_info("out", TensorProto.FLOAT, out_shape)],
)
model = helper.make_model(graph, producer_name=name)
check_correctness(model, inputs={"in": data})
def _test_expand_dynamic_shapeexpr(name, data, shape_data, shape, ref_data):
shape_node = onnx.helper.make_node("Shape", inputs=["in_2"], outputs=["shape"])
expand_node = helper.make_node("Expand", ["in", "shape"], ["out"])
in_shape = list(data.shape)
out_shape = list(ref_data.shape)
graph = helper.make_graph(
[shape_node, expand_node],
"expand_test",
inputs=[
helper.make_tensor_value_info("in", TensorProto.FLOAT, in_shape),
helper.make_tensor_value_info("in_2", TensorProto.FLOAT, shape),
],
outputs=[helper.make_tensor_value_info("out", TensorProto.FLOAT, out_shape)],
)
model = helper.make_model(graph, producer_name=name)
check_correctness(model, inputs={"in": data, "in_2": shape_data})
if not dynamic:
in_shape = (3, 1)
shape = (3, 4)
data = np.random.uniform(size=in_shape).astype(np.float32)
ref_data = np.tile(data, 4)
_test_expand("expand_with_dim_unchanged_test", data, shape, ref_data)
in_shape = (3, 1)
shape = (1, 3, 4)
data = np.random.uniform(size=in_shape).astype(np.float32)
ref_data = np.tile(data, (1, 1, 4))
_test_expand("expand_with_diff_dim", data, shape, ref_data)
in_shape = (3, 1)
shape = (1, 1, 3, 1)
data = np.random.uniform(size=in_shape).astype(np.float32)
ref_data = np.tile(data, (1, 1, 1, 1))
_test_expand("expand_with_the_same_suffix_dims", data, shape, ref_data)
else:
in_shape = (1, 32, 32)
shape = ("batch", 32, 32)
data = np.random.uniform(size=in_shape).astype(np.float32)
shape_data = np.random.uniform(size=(64, 32, 32)).astype(np.float32)
ref_data = np.tile(data, (64, 1, 1))
_test_expand_dynamic_shapeexpr("expand_with_dynamic_dim", data, shape_data, shape, ref_data)
def test_expand_incompatible_broadcasting():
"""
This test case reproduces the error where input tensor shape at dim 1 is 25
and target shape at dim 3 is 56, which violates ONNX broadcasting rules
"""
def _test_expand_error_case(name, data_shape, target_shape_vals):
data = np.random.uniform(size=data_shape).astype(np.float32)
shape_array = np.array(target_shape_vals, dtype=np.int64)
shape_node = onnx.helper.make_node(
"Constant",
inputs=[],
outputs=["shape"],
value=onnx.helper.make_tensor(
name="const_tensor",
data_type=onnx.TensorProto.INT64,
dims=shape_array.shape,
vals=shape_array.flatten(),
),
)
expand_node = helper.make_node("Expand", ["in", "shape"], ["out"])
graph = helper.make_graph(
[shape_node, expand_node],
"expand_error_test",
inputs=[helper.make_tensor_value_info("in", TensorProto.FLOAT, list(data.shape))],
outputs=[helper.make_tensor_value_info("out", TensorProto.FLOAT, target_shape_vals)],
)
model = helper.make_model(graph, producer_name=name)
with pytest.raises(ValueError) as exc_info:
from_onnx(model, keep_params_in_input=True)
error_msg = str(exc_info.value)
assert (
"broadcast" in error_msg.lower() or "incompatible" in error_msg.lower()
), f"Expected broadcasting error, but got: {error_msg}"
# Test case 1: Reproduce the exact error from the issue-17769
# Input shape: (25,), target shape: (1, 1, 1, 56)
# This should faill because input dim 1 (25) != target dim 3 (56) and neither is 1
_test_expand_error_case(
"expand_incompatible_25_to_56",
data_shape=(25,),
target_shape_vals=(1, 1, 1, 56),
)
# Test case 2: Another incompatible case
# Input shape: (1, 25), target shape: (1, 1, 1, 56)
# After right-alignment, input (1, 1, 1, 25) vs. target (1, 1, 1, 56)
# This should fail because 25 != 56 and neither is 1
_test_expand_error_case(
"expand_incompatible_aligned_25_to_56",
data_shape=(1, 25),
target_shape_vals=(1, 1, 1, 56),
)
# Test case 3: Valid case for comparison - should not raise error
def _test_expand_valid_case():
"""Test a valid expand case to ensure our fix doesn't break valid operations"""
data_shape = (1, 25)
target_shape_vals = [2, 25] # Valid: input (1, 25) can broadcast to (2, 25)
data = np.random.uniform(size=data_shape).astype(np.float32)
shape_array = np.array(target_shape_vals, dtype=np.int64)
shape_node = onnx.helper.make_node(
"Constant",
inputs=[],
outputs=["shape"],
value=onnx.helper.make_tensor(
name="const_tensor",
data_type=onnx.TensorProto.INT64,
dims=shape_array.shape,
vals=shape_array.flatten(),
),
)
expand_node = helper.make_node("Expand", ["in", "shape"], ["out"])
graph = helper.make_graph(
[shape_node, expand_node],
"expand_valid_test",
inputs=[helper.make_tensor_value_info("in", TensorProto.FLOAT, list(data.shape))],
outputs=[helper.make_tensor_value_info("out", TensorProto.FLOAT, target_shape_vals)],
)
model = helper.make_model(graph, producer_name="expand_valid_test_case")
try:
tvm_model = from_onnx(model, keep_params_in_input=True)
except Exception as e:
pytest.fail(f"Valid expand case should not fail, but got error: {e}")
_test_expand_valid_case()
# TODO(jwfromm) Current approach to dynamic expand is technically not well formed. Reenable once fixed.
@pytest.mark.skip("Produces ill-formed IR")
def test_constantofshape():
def verify_constantofshape(input_dim, value, dtype):
fill_node = helper.make_node(
"ConstantOfShape",
["input"],
["output"],
value=helper.make_tensor(
"value", helper.np_dtype_to_tensor_dtype(np.dtype(dtype)), (1,), (value,)
),
)
inputs = [helper.make_tensor_value_info("input", TensorProto.INT64, [len(input_dim)])]
graph = helper.make_graph(
[fill_node],
"fill_test",
inputs,
initializer=[
helper.make_tensor(
"input",
TensorProto.INT64,
[len(input_dim)],
np.asarray(input_dim).astype("int64"),
)
],
outputs=[
helper.make_tensor_value_info(
"output", helper.np_dtype_to_tensor_dtype(np.dtype(dtype)), input_dim
)
],
)
model = helper.make_model(graph, producer_name="fill_test")
input_np = np.array(input_dim).astype("int64")
check_correctness(model, inputs={"input": input_np})
verify_constantofshape((2, 3, 4, 5), 10, "float32")
verify_constantofshape((3, 3), 0, "int32")
verify_constantofshape((1, 2, 3), -1, "float32")
def test_slice():
def verify_slice(data_shape, output_shape, starts, ends, axes=None, steps=None):
if isinstance(starts, list):
starts = np.array(starts, "int64")
if isinstance(ends, list):
ends = np.array(ends, "int64")
if isinstance(axes, list):
axes = np.array(axes, "int64")
if isinstance(steps, list):
steps = np.array(steps, "int64")
slice_inputs = ["x", "starts", "ends"]
initializer = [
helper.make_tensor("starts", TensorProto.INT64, starts.shape, starts),
helper.make_tensor("ends", TensorProto.INT64, ends.shape, ends),
]
if axes is not None:
initializer.append(helper.make_tensor("axes", TensorProto.INT64, axes.shape, axes))
slice_inputs.append("axes")
if steps is not None:
initializer.append(helper.make_tensor("steps", TensorProto.INT64, steps.shape, steps))
slice_inputs.append("steps")
slice_node = helper.make_node("Slice", inputs=slice_inputs, outputs=["y"])
graph = helper.make_graph(
[slice_node],
"slice_test",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, data_shape),
],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, output_shape)],
initializer=initializer,
)
model = helper.make_model(graph, producer_name="slice_test")
check_correctness(model)
# Test with all parameters set.
verify_slice([20, 10, 5], [3, 10, 5], starts=[0, 0], ends=[3, 10], axes=[0, 1], steps=[1, 1])
# Test with default axes and steps.
verify_slice([20, 10, 5], [3, 10, 5], starts=[0, 0], ends=[3, 10])
# Test with negative steps.
verify_slice(
[20, 10, 5],
[19, 3, 2],
starts=[20, 10, 4], # NOTE: the start is out of bounds
ends=[0, 0, 1],
steps=[-1, -3, -2],
axes=[0, 1, 2],
)
verify_slice([20, 10, 5], [10, 5], starts=[0, 0], ends=[3, 10], axes=[1, 2])
verify_slice([20, 10, 5], [10, 5], starts=[0, 0], ends=[3, 10], axes=[1, 2])
# TODO (gigiblender): Enable this test when we have a way to pass the steps but not axes.
# verify_slice(
# [20, 10, 5],
# [19, 3, 2],
# starts=[20, 10, 4],
# ends=[0, 0, 1],
# steps=[-1, -3, -2],
# )
def test_slice_dynamic_shape():
def verify_slice(
data_shape, data_instance_shape, output_shape, starts, ends, axes=None, steps=None
):
if isinstance(starts, list):
starts = np.array(starts, "int64")
if isinstance(ends, list):
ends = np.array(ends, "int64")
if isinstance(axes, list):
axes = np.array(axes, "int64")
if isinstance(steps, list):
steps = np.array(steps, "int64")
slice_inputs = ["y", "starts", "ends"]
initializer = [
helper.make_tensor("starts", TensorProto.INT64, starts.shape, starts),
helper.make_tensor("ends", TensorProto.INT64, ends.shape, ends),
]
if axes is not None:
initializer.append(helper.make_tensor("axes", TensorProto.INT64, axes.shape, axes))
slice_inputs.append("axes")
if steps is not None:
initializer.append(helper.make_tensor("steps", TensorProto.INT64, steps.shape, steps))
slice_inputs.append("steps")
shape_node = helper.make_node("Shape", inputs=["x"], outputs=["y"])
slice_node = helper.make_node("Slice", inputs=slice_inputs, outputs=["z"])
graph = helper.make_graph(
[shape_node, slice_node],
"slice_test",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, data_shape),
],
outputs=[helper.make_tensor_value_info("z", TensorProto.INT64, output_shape)],
initializer=initializer,
)
model = helper.make_model(graph, producer_name="slice_test")
inputs = {"x": rg.standard_normal(size=data_instance_shape).astype("float32")}
check_correctness(model, inputs)
verify_slice([20, 10, 5], [20, 10, 5], [2], starts=[0], ends=[2], axes=[0])
verify_slice(["A", 10, 5], [20, 10, 5], [2], starts=[0], ends=[2], axes=[0])
verify_slice(["A", "B", 5], [20, 10, 5], [2], starts=[0], ends=[2], axes=[0])
verify_slice([20, 10, "C"], [20, 10, 5], [2], starts=[0], ends=[2], axes=[0])
verify_slice(["A", "B", "C"], [20, 10, 5], [2], starts=[0], ends=[2], axes=[0])
verify_slice([20, 10, 5], [20, 10, 5], [1], starts=[1], ends=[2], axes=[0])
verify_slice(["A", 10, 5], [20, 10, 5], [1], starts=[1], ends=[2], axes=[0])
verify_slice(["A", "B", 5], [20, 10, 5], [1], starts=[1], ends=[2], axes=[0])
verify_slice([20, 10, "C"], [20, 10, 5], [1], starts=[1], ends=[2], axes=[0])
verify_slice(["A", "B", "C"], [20, 10, 5], [1], starts=[1], ends=[2], axes=[0])
verify_slice([20, 10, 5], [20, 10, 5], [2], starts=[1], ends=[3], axes=[0])
verify_slice(["A", 10, 5], [20, 10, 5], [2], starts=[1], ends=[3], axes=[0])
verify_slice(["A", "B", 5], [20, 10, 5], [2], starts=[1], ends=[3], axes=[0])
verify_slice([20, 10, "C"], [20, 10, 5], [2], starts=[1], ends=[3], axes=[0])
verify_slice(["A", "B", "C"], [20, 10, 5], [2], starts=[1], ends=[3], axes=[0])
# TODO Enable dynamism
@pytest.mark.parametrize("dynamic", [False])
def test_attention(dynamic):
def verify_attention(
input_,
weight,
bias,
mask_index,
num_heads,
mask_filter_value,
qkv_hidden_sizes,
relative_position_bias,
):
node = onnx.helper.make_node(
"Attention",
inputs=["input", "weight", "bias", "mask_index", "", "relative_position_bias"],
outputs=["output"],
domain="com.microsoft",
num_heads=num_heads,
# TODO(jwfromm) OnnxRT doesnt work with this attribute, figure out why not.
# mask_filter_value=mask_filter_value,
qkv_hidden_sizes=qkv_hidden_sizes,
)
input_shape = list(input_.shape)
weight_shape = list(weight.shape)
bias_shape = list(bias.shape)
mask_shape = list(mask_index.shape)
relative_position_bias_shape = list(relative_position_bias.shape)
output_shape = list(input_.shape)
if dynamic:
input_shape = ["?" for _ in range(len(input_.shape))]
weight_shape = ["?" for _ in range(len(weight.shape))]
bias_shape = ["?" for _ in range(len(bias.shape))]
mask_shape = ["?" for _ in range(len(mask_index.shape))]
output_shape = ["?" for _ in range(len(input_.shape))]
graph = helper.make_graph(
[node],
"attention_test",
inputs=[
helper.make_tensor_value_info("input", TensorProto.FLOAT, input_shape),
helper.make_tensor_value_info("weight", TensorProto.FLOAT, weight_shape),
helper.make_tensor_value_info("bias", TensorProto.FLOAT, bias_shape),
helper.make_tensor_value_info("mask_index", TensorProto.INT32, mask_shape),
helper.make_tensor_value_info(
"relative_position_bias", TensorProto.FLOAT, relative_position_bias_shape
),
],
outputs=[
helper.make_tensor_value_info("output", TensorProto.FLOAT, output_shape),
],
)
model = helper.make_model(graph, producer_name="attention_test")
check_correctness(
model,
inputs={
"input": input_,
"weight": weight,
"bias": bias,
"mask_index": mask_index,
"relative_position_bias": relative_position_bias,
},
# Maximum observed delta from 500 iterations was 2e-4.
atol=1e-3,
)
# "present" output should be nullptr when the "past" input isn't included,
# but ort requires an output shape to be specified?
# verify_with_ort_with_inputs(
# model,
# [input_, weight, bias, mask_index],
# [input_.shape, present_output_shape],
# target=target,
# dev=dev,
# rtol=1e-4,
# atol=1e-4,
# )
input_hidden_size = 128
batch_size = 4
sequence_length = 4
num_heads = 12
qkv_hidden_sizes = [192, 192, 96]
mask_filter_value = -512.0
dtype = "float32"
input_array = np.random.random((batch_size, sequence_length, input_hidden_size)).astype(dtype)
weight = np.random.normal(size=(input_hidden_size, sum(qkv_hidden_sizes))).astype(dtype) * 0.1
bias = np.random.randn(sum(qkv_hidden_sizes)).astype(dtype)
mask_index = np.random.randint(2, size=(batch_size, sequence_length)).astype("int32")
relative_position_bias = np.random.randn(
batch_size, num_heads, sequence_length, sequence_length
).astype(dtype)
verify_attention(
input_array,
weight,
bias,
mask_index,
num_heads,
mask_filter_value,
qkv_hidden_sizes,
relative_position_bias,
)
@pytest.mark.parametrize("dynamic", [True, False])
def test_pad(dynamic):
if dynamic:
pytest.skip("Dynamic pad not supported")
def verify_pad(input_shape, pads, mode="constant", value=0.0):
indata = np.random.normal(size=input_shape).astype(np.float32)
# numpy expect result
len_dim = len(pads) // 2
np_pads = [(pads[i], pads[i + len_dim]) for i in range(len_dim)]
pads = np.array(pads)
# onnx graph
if mode in ["edge", "reflect"]:
outdata = np.pad(indata, pad_width=np_pads, mode=mode)
node = helper.make_node("Pad", inputs=["input", "pads"], outputs=["output"], mode=mode)
graph = helper.make_graph(
[node],
"pad_test",
inputs=[
helper.make_tensor_value_info("input", TensorProto.FLOAT, list(indata.shape))
],
initializer=[helper.make_tensor("pads", TensorProto.INT64, (len(pads),), pads)],
outputs=[
helper.make_tensor_value_info("output", TensorProto.FLOAT, list(outdata.shape))
],
)
else:
outdata = np.pad(indata, pad_width=np_pads, mode="constant", constant_values=value)
node = helper.make_node(
"Pad",
inputs=["input", "pads", "constant_value"],
outputs=["output"],
mode="constant",
)
graph = helper.make_graph(
[node],
"pad_test",
inputs=[
helper.make_tensor_value_info("input", TensorProto.FLOAT, list(indata.shape))
],
initializer=[
helper.make_tensor("pads", TensorProto.INT64, (len(pads),), pads),
helper.make_tensor("constant_value", TensorProto.FLOAT, (1,), [value]),
],
outputs=[
helper.make_tensor_value_info("output", TensorProto.FLOAT, list(outdata.shape))
],
)
model = helper.make_model(graph, producer_name="pad_test")
check_correctness(model)
verify_pad((2, 2), [0, 1, 0, 0], "constant", 0.0)
verify_pad((2, 3), [1, 0, 0, 1], "constant", 0.0)
verify_pad((3, 2), [0, 0, 1, 0], "constant", 5.0)
verify_pad((1, 3, 4, 5), [0, 1, 1, 1, 0, 0, 1, 1], "reflect")
@pytest.mark.parametrize("dynamic", [True, False])
def test_pad_v2(dynamic):
if dynamic:
pytest.skip("Dynamic pad not supported")
def verify_pad(input_shape, pads, mode="constant", value=0.0):
indata = np.random.normal(size=input_shape).astype(np.float32)
# numpy expect result
len_dim = len(pads) // 2
np_pads = [(pads[i], pads[i + len_dim]) for i in range(len_dim)]
pads = np.array(pads)
# onnx graph
if mode in ["edge", "reflect"]:
outdata = np.pad(indata, pad_width=np_pads, mode=mode)
node = helper.make_node(
"Pad", inputs=["input"], outputs=["output"], mode=mode, pads=pads
)
graph = helper.make_graph(
[node],
"pad_test",
inputs=[
helper.make_tensor_value_info("input", TensorProto.FLOAT, list(indata.shape))
],
outputs=[
helper.make_tensor_value_info("output", TensorProto.FLOAT, list(outdata.shape))
],
)
else:
outdata = np.pad(indata, pad_width=np_pads, mode="constant", constant_values=value)
node = helper.make_node(
"Pad",
inputs=["input"],
outputs=["output"],
mode="constant",
pads=pads,
value=value,
)
graph = helper.make_graph(
[node],
"pad_test",
inputs=[
helper.make_tensor_value_info("input", TensorProto.FLOAT, list(indata.shape))
],
outputs=[
helper.make_tensor_value_info("output", TensorProto.FLOAT, list(outdata.shape))
],
)
model = helper.make_model(graph, producer_name="pad_test")
check_correctness(model=model, opset=10)
verify_pad((2, 2), [0, 1, 0, 0], "constant", 0.0)
verify_pad((2, 3), [1, 0, 0, 1], "constant", 0.0)
verify_pad((3, 2), [0, 0, 1, 0], "constant", 5.0)
verify_pad((1, 3, 4, 5), [0, 1, 1, 1, 0, 0, 1, 1], "reflect")
@pytest.mark.parametrize("fp_arith", [np.float16, np.float32])
@pytest.mark.parametrize("dynamic", [True, False])
def test_split(fp_arith, dynamic):
def verify_split(indata_shape, outdata_shapes, split, axis=0, pass_split=True, opset=11):
indata = np.random.normal(size=indata_shape).astype(fp_arith)
input_names = ["input"]
initializer = []
if split:
split_index = range(len(split))
else:
split_index = range(len(outdata_shapes))
indata_shape = list(indata.shape)
if dynamic:
indata_shape = ["?" for _ in range(len(indata.shape))]
outdata_shapes = [["?" for _ in range(len(o))] for o in outdata_shapes]
inputs = [
helper.make_tensor_value_info(
"input", helper.np_dtype_to_tensor_dtype(indata.dtype), indata_shape
)
]
split_constant = None
if pass_split:
if opset >= 13:
np_split = np.array(split).astype(np.int64)
split_constant = make_constant_node(
"split", onnx.TensorProto.INT64, list(np_split.shape), np_split
)
input_names.append("split")
node = helper.make_node(
"Split",
inputs=input_names,
outputs=[f"output_{i}" for i in range(len(split_index))],
axis=axis,
)
if pass_split and opset < 13:
split_attr = helper.make_attribute("split", split)
node.attribute.append(split_attr)
nodes = [split_constant, node] if split_constant else [node]
graph = helper.make_graph(
nodes,
"split_test",
inputs=inputs,
initializer=initializer,
outputs=[
helper.make_tensor_value_info(
f"output_{i}",
helper.np_dtype_to_tensor_dtype(indata.dtype),
list(outdata_shapes[i]),
)
for i in range(len(split_index))
],
)
model = helper.make_model(graph, producer_name="split_test")
check_correctness(model, inputs={"input": indata}, opset=opset)
# 1D
verify_split(6, [[2], [2], [2]], [2, 2, 2])
verify_split(6, [[2], [2], [2]], [2, 2, 2], pass_split=False)
verify_split(6, [[2], [1], [3]], [2, 1, 3])
verify_split(6, [[2], [1], [3]], [2, 1, 3], opset=13)
# 2D
verify_split(
(4, 4),
[[2, 2], [2, 2]],
[2, 2],
axis=1,
)
verify_split(
(4, 4),
[[2, 2], [2, 2]],
[2, 2],
axis=1,
opset=13,
)
# Split evenly (unstack)
verify_split(3, [[1], [1], [1]], False, pass_split=False)
# Split a single value to a single value
verify_split(1, [[1]], [1], pass_split=True)
# Test that the default case modifies nothing when split list has length one
verify_split((1, 2), [[2]], [2], axis=1)
verify_split((1, 2), [[2]], [1])
@pytest.mark.parametrize("dynamic", [True, False])
def test_tile(dynamic):
def verify_tile(in_shape, repeats, out_shape):
node = helper.make_node("Tile", inputs=["input", "repeats"], outputs=["out"])
if dynamic:
indata = np.random.normal(size=in_shape).astype(np.float32)
in_shape = ["?" for _ in range(len(in_shape))]
out_shape = ["?" for _ in range(len(out_shape))]
graph = helper.make_graph(
[node],
"tile_test",
inputs=[
helper.make_tensor_value_info("input", TensorProto.FLOAT, in_shape),
],
initializer=[
helper.make_tensor("repeats", TensorProto.INT64, list(repeats.shape), repeats)
],
outputs=[helper.make_tensor_value_info("out", TensorProto.FLOAT, out_shape)],
)
model = helper.make_model(graph, producer_name="tile_test")
if dynamic:
check_correctness(model, {"input": indata})
else:
check_correctness(model)
x = np.random.rand(2, 3, 4, 5).astype(np.float32)
repeats = np.random.randint(low=1, high=10, size=(np.ndim(x),)).astype(np.int64)
z_array = np.tile(x, repeats)
verify_tile(x.shape, repeats, z_array.shape)
def _generate_roi_cases():
# Base case when with_roi is False
roi_list = [
pytest.param(False, None, id="no_roi"),
]
# Valid when with_roi is True
roi_cases = [
[0.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 1.0, 1.0],
[0.1, 0.1, 0.9, 0.9],
[0.2, 0.2, 0.8, 0.8],
[0.3, 0.3, 0.7, 0.7],
[0.4, 0.4, 0.6, 0.6],
[0.5, 0.5, 0.5, 0.5],
[0.1, 0.2, 0.9, 0.8],
]
for roi in roi_cases:
roi_list.append(pytest.param(True, roi, id=f"roi_{'_'.join(str(x) for x in roi)}"))
return roi_list
@pytest.mark.parametrize("with_roi, roi_list", _generate_roi_cases())
def test_resize(with_roi, roi_list):
resize_node = helper.make_node(
"Resize", ["X", "roi" if with_roi else "", "scales"], ["Y"], mode="cubic"
)
graph = helper.make_graph(
[resize_node],
"resize_test",
inputs=[
helper.make_tensor_value_info("X", TensorProto.FLOAT, [1, 3, 32, 32]),
],
initializer=[
helper.make_tensor("scales", TensorProto.FLOAT, [4], [1.0, 1.0, 2.0, 2.0]),
*([helper.make_tensor("roi", TensorProto.FLOAT, [4], roi_list)] if with_roi else []),
],
outputs=[
helper.make_tensor_value_info("Y", TensorProto.FLOAT, [1, 3, 64, 64]),
],
)
model = helper.make_model(graph, producer_name="resize_test")
check_correctness(model)
def test_einsum():
eqn = "ij->i"
einsum_node = helper.make_node("Einsum", ["x"], ["y"], equation=eqn)
graph = helper.make_graph(
[einsum_node],
"einsum_test",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, [3, 4]),
],
outputs=[
helper.make_tensor_value_info("y", TensorProto.FLOAT, [3]),
],
)
model = helper.make_model(graph, producer_name="einsum_test")
check_correctness(model)
def test_range():
range_node = helper.make_node(
"Range",
["start", "limit", "delta"],
["output"],
)
graph = helper.make_graph(
[range_node],
"range_test",
inputs=[],
initializer=[
helper.make_tensor("start", TensorProto.INT64, [], [1]),
helper.make_tensor("limit", TensorProto.INT64, [], [5]),
helper.make_tensor("delta", TensorProto.INT64, [], [2]),
],
outputs=[
helper.make_tensor_value_info("output", TensorProto.INT64, [2]),
],
)
model = helper.make_model(graph, producer_name="range_test")
check_correctness(model)
def test_batch_norm():
batch_norm_node = helper.make_node(
"BatchNormalization", ["x", "s", "bias", "mean", "var"], ["y"], epsilon=1e-2
)
graph = helper.make_graph(
[batch_norm_node],
"batch_norm_test",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, [2, 3, 4, 5]),
helper.make_tensor_value_info("s", TensorProto.FLOAT, [3]),
helper.make_tensor_value_info("bias", TensorProto.FLOAT, [3]),
helper.make_tensor_value_info("mean", TensorProto.FLOAT, [3]),
helper.make_tensor_value_info("var", TensorProto.FLOAT, [3]),
],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, [2, 3, 4, 5])],
)
model = helper.make_model(graph, producer_name="batch_norm_test")
check_correctness(model, opset=15)
@pytest.mark.parametrize("pool_name", ["MaxPool", "AveragePool", "LpPool"])
@pytest.mark.parametrize(
"shape, auto_pad, kernel_shape, strides, pads",
[
# Pool1D
([1, 1, 32], "NOTSET", [3], [1], [1, 1]),
# Pool1D with stride
([1, 1, 32], "NOTSET", [3], [2], [1, 1]),
# Pool1D with stride and autopadding
([1, 1, 32], "SAME_UPPER", [7], [2], None),
([1, 1, 32], "SAME_LOWER", [4], [4], None),
([1, 1, 32], "VALID", [5], [5], None),
([1, 1, 32], "SAME_UPPER", [3], [1], None),
# Pool2D
([1, 1, 32, 32], "NOTSET", [3, 3], [1, 1], [1, 1, 1, 1]),
# Pool2D with stride
([1, 1, 32, 32], "NOTSET", [3, 3], [2, 2], [1, 1, 1, 1]),
# Pool2D with stride and autopadding
([1, 1, 32, 32], "SAME_UPPER", [3, 7], [3, 2], None),
([1, 1, 32, 32], "SAME_LOWER", [3, 3], [2, 2], None),
([1, 1, 32, 32], "VALID", [3, 3], [2, 2], None),
([1, 1, 32, 32], "SAME_UPPER", [3, 3], [1, 1], None),
# Pool3D
([1, 1, 32, 32, 32], "NOTSET", [3, 3, 4], [1, 1, 1], [1, 2, 1, 1, 2, 2]),
# Pool3D with stride
([1, 1, 32, 32, 32], "NOTSET", [3, 4, 3], [2, 2, 3], [1, 1, 1, 1, 1, 2]),
# Pool3D with stride and autopadding
([1, 1, 32, 32, 32], "SAME_UPPER", [4, 3, 3], [3, 2, 2], None),
([1, 1, 32, 32, 32], "SAME_LOWER", [3, 3, 4], [2, 2, 2], None),
([1, 1, 32, 32, 32], "VALID", [3, 3, 5], [2, 2, 3], None),
([1, 1, 32, 32, 32], "SAME_UPPER", [3, 3, 5], [1, 1, 1], None),
],
)
def test_pool(
pool_name: str,
shape: List[int],
auto_pad: str,
kernel_shape: List[int],
strides: List[int],
pads: List[int],
):
verify_unary(
pool_name,
shape,
attrs={
"kernel_shape": kernel_shape,
"strides": strides,
"pads": pads,
"auto_pad": auto_pad,
},
)
def test_global_average_pool():
verify_unary("GlobalAveragePool", [1, 3, 32])
verify_unary("GlobalAveragePool", [1, 3, 32, 32])
verify_unary("GlobalAveragePool", [1, 3, 32, 32, 32])
def test_global_max_pool():
verify_unary("GlobalMaxPool", [1, 3, 32])
verify_unary("GlobalMaxPool", [1, 3, 32, 32])
verify_unary("GlobalMaxPool", [1, 3, 32, 32, 32])
@pytest.mark.parametrize("p", [1, 2, 3])
def test_global_lp_pool(p: int):
verify_unary("GlobalLpPool", [1, 3, 32], attrs={"p": p})
verify_unary("GlobalLpPool", [1, 3, 32, 32], attrs={"p": p})
verify_unary("GlobalLpPool", [1, 3, 32, 32, 32], attrs={"p": p})
@pytest.mark.parametrize("kernel_shape", [[2, 2], [3, 3]])
@pytest.mark.parametrize("pads", [None, [1, 1, 1, 1]])
@pytest.mark.parametrize("strides", [None, [2, 2]])
def test_maxunpool(kernel_shape, pads, strides):
input_shape = [16, 3, 16, 16]
input_names = ["X", "I"]
input_info = [
helper.make_tensor_value_info("X", TensorProto.FLOAT, input_shape),
helper.make_tensor_value_info("I", TensorProto.INT64, input_shape),
]
attrs = {"kernel_shape": kernel_shape}
if pads is not None:
attrs["pads"] = pads
if strides is not None:
attrs["strides"] = strides
node = helper.make_node("MaxUnpool", inputs=input_names, outputs=["y"], **attrs)
graph = helper.make_graph(
[node],
"maxunpool_test",
inputs=input_info,
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, None)],
)
max_random = int(np.prod(np.array(kernel_shape)))
indices = np.random.randint(0, max_random, size=input_shape)
model = helper.make_model(graph, producer_name="maxunpool_test")
check_correctness(model, inputs={"I": indices})
def test_flatten():
verify_unary("Flatten", [1, 3, 32, 32], attrs={"axis": 0})
verify_unary("Flatten", [1, 3, 32, 32], attrs={"axis": -1})
verify_unary("Flatten", [1, 3, 32, 32], attrs={"axis": 2})
def test_flatten_dynamic():
verify_unary_dynamic_shape("Flatten", [1, "A", "B", 32], [1, 3, 32, 32], attrs={"axis": 0})
verify_unary_dynamic_shape("Flatten", [1, "A", "B", 32], [1, 3, 32, 32], attrs={"axis": -1})
verify_unary_dynamic_shape("Flatten", [1, "A", "B", 32], [1, 3, 32, 32], attrs={"axis": 2})
def test_onehot():
one_hot_node = helper.make_node("OneHot", ["indices", "depth", "values"], ["y"], axis=1)
graph = helper.make_graph(
[one_hot_node],
"one_hot_test",
inputs=[
helper.make_tensor_value_info("indices", TensorProto.INT64, [2, 2]),
],
initializer=[
helper.make_tensor("depth", TensorProto.INT64, [], [10]),
helper.make_tensor("values", TensorProto.FLOAT, [2], [3, 1]),
],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, [2, 10, 2])],
)
model = helper.make_model(graph, producer_name="one_hot_test")
values = {
"indices": np.array([[1, 9], [2, 4]], dtype="int64"),
}
check_correctness(model, inputs=values)
@pytest.mark.parametrize("axis", [None, 0, 1, -1])
@pytest.mark.parametrize("sorted", [0, 1])
def test_unique(axis: Optional[int], sorted: int):
input_shape = [32, 32]
if axis is None:
output_shape = [-1]
else:
output_shape = [32, 32]
output_shape[axis] = -1
unique_node = helper.make_node("Unique", ["x"], ["y"], axis=axis, sorted=sorted)
graph = helper.make_graph(
[unique_node],
"unique_test",
inputs=[helper.make_tensor_value_info("x", TensorProto.FLOAT, input_shape)],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, output_shape)],
)
model = helper.make_model(graph, producer_name="unique_test")
check_correctness(model)
@pytest.mark.parametrize("shape", [(), (1,), (2, 3), (4, 5, 6), (7, 8, 9, 10)])
def test_nonzero(shape):
verify_unary("NonZero", shape, input_dtype=TensorProto.BOOL, output_dtype=TensorProto.INT64)
@pytest.mark.parametrize("mode", ["DCR", "CRD"])
def test_depth_to_space(mode: Literal["DCR", "CRD"]):
in_shape = [1, 8, 2, 3]
out_shape = [1, 2, 4, 6]
blocksize = 2
node = onnx.helper.make_node(
"DepthToSpace", inputs=["x"], outputs=["y"], blocksize=blocksize, mode=mode
)
graph = helper.make_graph(
[node],
"depth_to_space_test",
inputs=[helper.make_tensor_value_info("x", TensorProto.FLOAT, in_shape)],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, out_shape)],
)
model = helper.make_model(graph, producer_name="depth_to_space_test")
check_correctness(model)
def test_space_to_depth():
in_shape = [1, 2, 4, 6]
out_shape = [1, 8, 2, 3]
blocksize = 2
node = onnx.helper.make_node("SpaceToDepth", inputs=["x"], outputs=["y"], blocksize=blocksize)
graph = helper.make_graph(
[node],
"space_to_depth_test",
inputs=[helper.make_tensor_value_info("x", TensorProto.FLOAT, in_shape)],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, out_shape)],
)
model = helper.make_model(graph, producer_name="space_to_depth_test")
check_correctness(model)
def construct_sequence(input_shape: List[int], num_tensors: int, name: str = "sequence"):
inputs = [f"data{i}" for i in range(num_tensors)]
sequence_construct_node = helper.make_node("SequenceConstruct", inputs, [name])
graph_inputs = [
helper.make_tensor_value_info(f"data{i}", TensorProto.FLOAT, input_shape)
for i in range(num_tensors)
]
return sequence_construct_node, graph_inputs
def make_constant_node(name: str, data_type: int, dims: List[int], vals: List[int]):
return helper.make_node(
"Constant",
inputs=[],
outputs=[name],
value=helper.make_tensor(name=name, data_type=data_type, dims=dims, vals=vals),
)
def test_sequence_construct():
node, graph_inputs = construct_sequence(input_shape=[32, 32], num_tensors=2)
graph = helper.make_graph(
[node],
"test_sequence_construct",
inputs=graph_inputs,
outputs=[helper.make_tensor_sequence_value_info("sequence", TensorProto.FLOAT, [32, 32])],
)
model = helper.make_model(graph, producer_name="test_sequence_construct")
check_correctness(model)
def test_sequence_empty():
sequence_empty_node = helper.make_node("SequenceEmpty", [], ["sequence"])
graph = helper.make_graph(
[sequence_empty_node],
"test_sequence_empty",
inputs=[],
outputs=[helper.make_tensor_sequence_value_info("sequence", TensorProto.FLOAT, [])],
)
model = helper.make_model(graph, producer_name="test_sequence_empty")
check_correctness(model)
@pytest.mark.parametrize("explicit_position", [True, False])
def test_sequence_erase(explicit_position: bool):
seq_node, graph_inputs = construct_sequence(input_shape=[32, 32], num_tensors=4)
index = make_constant_node("index", TensorProto.INT64, (), [1])
node_input = ["sequence", "index"] if explicit_position else ["sequence"]
sequence_erase_node = helper.make_node("SequenceErase", node_input, ["output"])
graph = helper.make_graph(
[index, seq_node, sequence_erase_node],
"test_sequence_erase",
inputs=graph_inputs,
outputs=[helper.make_tensor_sequence_value_info("output", TensorProto.FLOAT, [32, 32])],
)
model = helper.make_model(graph, producer_name="test_sequence_erase")
check_correctness(model)
@pytest.mark.parametrize("explicit_position", [True, False])
def test_sequence_insert(explicit_position: bool):
seq_node, graph_inputs = construct_sequence(input_shape=[32, 32], num_tensors=4)
index = make_constant_node("index", TensorProto.INT64, (), [0])
node_input = ["sequence", "value", "index"] if explicit_position else ["sequence", "value"]
sequence_insert_node = helper.make_node("SequenceInsert", node_input, ["output"])
graph = helper.make_graph(
[index, seq_node, sequence_insert_node],
"test_sequence_insert",
inputs=[*graph_inputs, helper.make_tensor_value_info("value", TensorProto.FLOAT, [32, 32])],
outputs=[helper.make_tensor_sequence_value_info("output", TensorProto.FLOAT, [32, 32])],
)
model = helper.make_model(graph, producer_name="test_sequence_insert")
check_correctness(model)
@pytest.mark.parametrize("new_axis", [0, 1])
def test_concat_from_sequence(new_axis: Literal[0, 1]):
if new_axis == 1:
pytest.skip("ConcatFromSequence with new_axis=1 is not supported yet")
seq_node, graph_inputs = construct_sequence(input_shape=[32, 32], num_tensors=2)
concat_from_sequence_node = helper.make_node(
"ConcatFromSequence", ["sequence"], ["output"], axis=1
)
graph = helper.make_graph(
[seq_node, concat_from_sequence_node],
"test_concat_from_sequence",
inputs=graph_inputs,
outputs=[helper.make_tensor_value_info("output", TensorProto.FLOAT, [64, 32])],
)
model = helper.make_model(graph, producer_name="test_concat_from_sequence")
check_correctness(model)
@pytest.mark.parametrize("split", [2, [16, 48]])
def test_split_to_sequence(split):
split_to_sequence_node = helper.make_node(
"SplitToSequence",
["data", "split"],
["output"],
axis=0,
)
split_shape = [len(split)] if isinstance(split, list) else ()
split_node = make_constant_node(
"split", TensorProto.INT64, split_shape, [split] if isinstance(split, int) else split
)
graph = helper.make_graph(
[split_node, split_to_sequence_node],
"test_split_to_sequence",
inputs=[helper.make_tensor_value_info("data", TensorProto.FLOAT, [64, 32])],
outputs=[helper.make_tensor_sequence_value_info("output", TensorProto.FLOAT, [32, 32])],
)
model = helper.make_model(graph, producer_name="test_split_to_sequence")
check_correctness(model)
def test_sequence_at():
seq_node, graph_inputs = construct_sequence(input_shape=[32, 32], num_tensors=4)
index = make_constant_node("index", TensorProto.INT64, (), [1])
node_input = ["sequence", "index"]
sequence_at_node = helper.make_node("SequenceAt", node_input, ["output"])
graph = helper.make_graph(
[index, seq_node, sequence_at_node],
"test_sequence_at",
inputs=graph_inputs,
outputs=[helper.make_tensor_value_info("output", TensorProto.FLOAT, [32, 32])],
)
model = helper.make_model(graph, producer_name="test_sequence_at")
check_correctness(model)
def test_symbolic_shape_deduction():
index_node = helper.make_node(
"Constant",
inputs=[],
outputs=["indices"],
value=helper.make_tensor("indices", TensorProto.INT64, [], [0]),
)
shape_node = helper.make_node("Shape", ["data"], ["shape_output"])
gather_node = helper.make_node("Gather", ["shape_output", "indices"], ["gather_output"])
unsqueeze_node = helper.make_node("Unsqueeze", ["gather_output", "axes"], ["unsqueeze_output"])
constant_of_shape_node = helper.make_node(
"ConstantOfShape",
["unsqueeze_output"],
["output"],
value=helper.make_tensor("value", TensorProto.FLOAT, [], [1]),
)
graph = helper.make_graph(
[index_node, shape_node, gather_node, unsqueeze_node, constant_of_shape_node],
"test_shape_deduction",
inputs=[
helper.make_tensor_value_info("data", TensorProto.FLOAT, ["batch", "seq"]),
],
initializer=[helper.make_tensor("axes", TensorProto.INT64, [1], vals=[0])],
outputs=[helper.make_tensor_value_info("output", TensorProto.INT64, [1])],
)
model = helper.make_model(graph, producer_name="test_shape_deduction")
tvm_model = from_onnx(model, keep_params_in_input=True)
@R.function
def expected(
data: R.Tensor(("batch", "seq"), dtype="float32")
) -> R.Tensor(dtype="float32", ndim=1):
batch = T.int64()
seq = T.int64()
R.func_attr({"num_input": 1})
with R.dataflow():
gv: R.Tensor((batch,), dtype="float32") = R.broadcast_to(
R.const(1, "float32"), R.shape([batch])
)
R.output(gv)
return gv
# TODO(siyuan): Enable assertion after fixing the SizeVar roundtrip issue
# tvm.ir.assert_structural_equal(expected, tvm_model["main"])
def test_multi_inputs_with_same_symbolic_shape():
concat_node = helper.make_node("Concat", ["data1", "data2"], ["output"], axis=1)
graph = helper.make_graph(
[concat_node],
"test_multi_symbolic_shape_input",
inputs=[
helper.make_tensor_value_info("data1", TensorProto.FLOAT, ["batch", 1]),
helper.make_tensor_value_info("data2", TensorProto.FLOAT, ["batch", 1]),
],
outputs=[helper.make_tensor_value_info("output", TensorProto.FLOAT, ["batch", 2])],
)
model = helper.make_model(graph, producer_name="test_multi_symbolic_shape_input")
check_correctness(model)
def test_multi_ops_with_same_params():
reshape_node_1 = helper.make_node("Reshape", ["a", "x"], ["b"])
reshape_node_2 = helper.make_node("Reshape", ["b", "x"], ["c"])
a_shape = [16]
output_shape = [1, 16]
graph = helper.make_graph(
[reshape_node_1, reshape_node_2],
"test_multi_ops_with_same_params",
inputs=[
helper.make_tensor_value_info("a", TensorProto.FLOAT, a_shape),
],
initializer=[
helper.make_tensor("x", TensorProto.INT64, [2], output_shape),
],
outputs=[helper.make_tensor_value_info("c", TensorProto.FLOAT, output_shape)],
)
model = helper.make_model(graph, producer_name="test_multi_ops_with_same_params")
check_correctness(model)
def test_params_names_start_with_onnx():
reshape_node = helper.make_node("Reshape", ["a", "onnx::x"], ["b"])
a_shape = [16]
output_shape = [1, 16]
graph = helper.make_graph(
[reshape_node],
"test_params_names_start_with_onnx",
inputs=[
helper.make_tensor_value_info("a", TensorProto.FLOAT, a_shape),
],
initializer=[
helper.make_tensor("onnx::x", TensorProto.INT64, [2], output_shape),
],
outputs=[helper.make_tensor_value_info("b", TensorProto.FLOAT, output_shape)],
)
model = helper.make_model(graph, producer_name="test_params_names_start_with_onnx")
check_correctness(model)
def test_shape_dim_string_expression():
def _verify(x_shape, example_shape):
identity_node = helper.make_node("Identity", ["x"], ["y"])
graph = helper.make_graph(
[identity_node],
"test_var_shape_dim_containing_expressions_onnx",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, x_shape),
],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, x_shape)],
)
model = helper.make_model(
graph, producer_name="test_var_shape_dim_containing_expressions_onnx"
)
inputs = {"x": generate_random_value(example_shape, TensorProto.FLOAT)}
check_correctness(model, inputs)
_verify(["A", "B", "A + B"], [3, 9, 12])
_verify(["A", "B", "A - B"], [9, 3, 6])
_verify(["A", "B", "A * B"], [9, 3, 27])
_verify(["A", "B", "A // B"], [9, 3, 3])
def test_shape_dim_string_expression_graph_add():
identity_node = helper.make_node("Identity", ["x"], ["y"])
x_shape = ["A", "B", "A + B"]
graph = helper.make_graph(
[identity_node],
"test_var_shape_dim_containing_expressions_onnx",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, x_shape),
],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, x_shape)],
)
model = helper.make_model(graph, producer_name="test_var_shape_dim_containing_expressions_onnx")
tvm_model = from_onnx(model, opset=14, keep_params_in_input=True)
# fmt: off
@I.ir_module
class Expected:
@R.function
def main(x: R.Tensor(("A", "B", "A + B"), dtype="float32")) -> R.Tensor(("A", "B", "A + B"), dtype="float32"):
A = T.int64(is_size_var=True)
B = T.int64(is_size_var=True)
R.func_attr({"num_input": 1})
with R.dataflow():
gv: R.Tensor((A, B, A + B), dtype="float32") = x
R.output(gv)
return gv
# fmt: on
tvm.ir.assert_structural_equal(tvm_model, Expected)
def test_shape_dim_string_expression_graph_subtract():
identity_node = helper.make_node("Identity", ["x"], ["y"])
x_shape = ["A", "B", "A - B"]
graph = helper.make_graph(
[identity_node],
"test_var_shape_dim_containing_expressions_onnx",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, x_shape),
],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, x_shape)],
)
model = helper.make_model(graph, producer_name="test_var_shape_dim_containing_expressions_onnx")
tvm_model = from_onnx(model, opset=14, keep_params_in_input=True)
# fmt: off
@I.ir_module
class Expected:
@R.function
def main(x: R.Tensor(("A", "B", "A - B"), dtype="float32")) -> R.Tensor(("A", "B", "A - B"), dtype="float32"):
A = T.int64(is_size_var=True)
B = T.int64(is_size_var=True)
R.func_attr({"num_input": 1})
with R.dataflow():
gv: R.Tensor((A, B, A - B), dtype="float32") = x
R.output(gv)
return gv
# fmt: on
tvm.ir.assert_structural_equal(tvm_model, Expected)
def test_shape_dim_string_expression_graph_mul():
identity_node = helper.make_node("Identity", ["x"], ["y"])
x_shape = ["A", "B", "A * B"]
graph = helper.make_graph(
[identity_node],
"test_var_shape_dim_containing_expressions_onnx",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, x_shape),
],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, x_shape)],
)
model = helper.make_model(graph, producer_name="test_var_shape_dim_containing_expressions_onnx")
tvm_model = from_onnx(model, opset=14, keep_params_in_input=True)
# fmt: off
@I.ir_module
class Expected:
@R.function
def main(x: R.Tensor(("A", "B", "A * B"), dtype="float32")) -> R.Tensor(("A", "B", "A * B"), dtype="float32"):
A = T.int64(is_size_var=True)
B = T.int64(is_size_var=True)
R.func_attr({"num_input": 1})
with R.dataflow():
gv: R.Tensor((A, B, A * B), dtype="float32") = x
R.output(gv)
return gv
# fmt: on
tvm.ir.assert_structural_equal(tvm_model, Expected)
def test_shape_dim_string_expression_graph_div_1():
identity_node = helper.make_node("Identity", ["x"], ["y"])
# this will result in a floordiv despite not using // since the operands are always int
x_shape = ["A", "B", "A / B"]
graph = helper.make_graph(
[identity_node],
"test_var_shape_dim_containing_expressions_onnx",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, x_shape),
],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, x_shape)],
)
model = helper.make_model(graph, producer_name="test_var_shape_dim_containing_expressions_onnx")
tvm_model = from_onnx(model, opset=14, keep_params_in_input=True)
# fmt: off
@I.ir_module
class Expected:
@R.function
def main(x: R.Tensor(("A", "B", "A // B"), dtype="float32")) -> R.Tensor(("A", "B", "A // B"), dtype="float32"):
A = T.int64(is_size_var=True)
B = T.int64(is_size_var=True)
R.func_attr({"num_input": 1})
with R.dataflow():
gv: R.Tensor((A, B, A // B), dtype="float32") = x
R.output(gv)
return gv
# fmt: on
tvm.ir.assert_structural_equal(tvm_model, Expected)
def test_shape_dim_string_expression_graph_div_2():
identity_node = helper.make_node("Identity", ["x"], ["y"])
x_shape = ["A", "B", "A // B"]
graph = helper.make_graph(
[identity_node],
"test_var_shape_dim_containing_expressions_onnx",
inputs=[
helper.make_tensor_value_info("x", TensorProto.FLOAT, x_shape),
],
outputs=[helper.make_tensor_value_info("y", TensorProto.FLOAT, x_shape)],
)
model = helper.make_model(graph, producer_name="test_var_shape_dim_containing_expressions_onnx")
tvm_model = from_onnx(model, opset=14, keep_params_in_input=True)
# fmt: off
@I.ir_module
class Expected:
@R.function
def main(x: R.Tensor(("A", "B", "A // B"), dtype="float32")) -> R.Tensor(("A", "B", "A // B"), dtype="float32"):
A = T.int64(is_size_var=True)
B = T.int64(is_size_var=True)
R.func_attr({"num_input": 1})
with R.dataflow():
gv: R.Tensor((A, B, A // B), dtype="float32") = x
R.output(gv)
return gv
# fmt: on
tvm.ir.assert_structural_equal(tvm_model, Expected)
def test_nms():
"""Test NonMaxSuppression operator conversion using our AllClassNMS implementation."""
nms_node = helper.make_node(
"NonMaxSuppression",
["boxes", "scores", "max_output_boxes_per_class", "iou_threshold", "score_threshold"],
["selected_indices"],
center_point_box=0,
)
boxes_shape = [1, 5, 4] # batch_size, num_boxes, 4
scores_shape = [1, 2, 5] # batch_size, num_classes, num_boxes
graph = helper.make_graph(
[nms_node],
"nms_test",
inputs=[
helper.make_tensor_value_info("boxes", TensorProto.FLOAT, boxes_shape),
helper.make_tensor_value_info("scores", TensorProto.FLOAT, scores_shape),
],
initializer=[
helper.make_tensor("max_output_boxes_per_class", TensorProto.INT64, [1], [3]),
helper.make_tensor("iou_threshold", TensorProto.FLOAT, [1], [0.5]),
helper.make_tensor("score_threshold", TensorProto.FLOAT, [1], [0.1]),
],
outputs=[helper.make_tensor_value_info("selected_indices", TensorProto.INT64, [0, 3])],
)
model = helper.make_model(graph, producer_name="nms_test")
model.opset_import[0].version = 11
# Use deterministic random inputs for consistent testing
bg = np.random.MT19937(0)
rg = np.random.Generator(bg)
boxes = rg.standard_normal(size=boxes_shape).astype(np.float32)
scores = rg.standard_normal(size=scores_shape).astype(np.float32)
inputs = {"boxes": boxes, "scores": scores}
# Run ONNX Runtime
ort_session = onnxruntime.InferenceSession(
model.SerializeToString(), providers=["CPUExecutionProvider"]
)
ort_output = ort_session.run([], inputs)
# Run TVM
tvm_model = from_onnx(model, opset=11, keep_params_in_input=True)
tvm_model = relax.transform.DecomposeOpsForInference()(tvm_model)
tvm_model = relax.transform.LegalizeOps()(tvm_model)
tvm_model, params = relax.frontend.detach_params(tvm_model)
with tvm.transform.PassContext(opt_level=3):
ex = tvm.compile(tvm_model, target="llvm")
vm = relax.VirtualMachine(ex, tvm.cpu())
input_list = [
inputs[key.name_hint] for key in tvm_model["main"].params if key.name_hint in inputs
]
if params:
input_list += params["main"]
vm.set_input("main", *input_list)
vm.invoke_stateful("main")
tvm_output = vm.get_outputs("main")
if isinstance(tvm_output, (list, tuple)):
tvm_selected = tvm_output[0].numpy()
else:
tvm_selected = tvm_output.numpy()
ort_selected = ort_output[0]
min_rows = min(tvm_selected.shape[0], ort_selected.shape[0])
if min_rows > 0:
tvm.testing.assert_allclose(
tvm_selected[:min_rows], ort_selected[:min_rows], rtol=1e-5, atol=1e-5
)
def test_nms_algorithm_correctness():
"""Test NMS algorithm correctness with fixed data to verify suppression logic."""
nms_node = helper.make_node(
"NonMaxSuppression",
["boxes", "scores", "max_output_boxes_per_class", "iou_threshold", "score_threshold"],
["selected_indices"],
center_point_box=0,
)
# Create fixed test data with known expected results
# Boxes: [x1, y1, x2, y2] format
boxes_data = np.array(
[
[
[0.0, 0.0, 1.0, 1.0], # Box 0: [0,0,1,1] - should be selected
[
0.5,
0.5,
1.5,
1.5,
], # Box 1: [0.5,0.5,1.5,1.5] - overlaps with box 0, should be suppressed
[2.0, 2.0, 3.0, 3.0],
]
], # Box 2: [2,2,3,3] - no overlap, should be selected
dtype=np.float32,
)
# Scores: higher score = better
scores_data = np.array(
[
[[0.9, 0.8, 0.7], [0.6, 0.5, 0.4]] # Class 0: [0.9, 0.8, 0.7] - box 0 has highest score
], # Class 1: [0.6, 0.5, 0.4] - box 0 has highest score
dtype=np.float32,
)
boxes_shape = [1, 3, 4] # batch_size, num_boxes, 4
scores_shape = [1, 2, 3] # batch_size, num_classes, num_boxes
graph = helper.make_graph(
[nms_node],
"nms_test_correctness",
inputs=[
helper.make_tensor_value_info("boxes", TensorProto.FLOAT, boxes_shape),
helper.make_tensor_value_info("scores", TensorProto.FLOAT, scores_shape),
],
initializer=[
helper.make_tensor(
"max_output_boxes_per_class", TensorProto.INT64, [1], [2]
), # Only 2 boxes per class
helper.make_tensor("iou_threshold", TensorProto.FLOAT, [1], [0.5]), # IoU threshold 0.5
helper.make_tensor(
"score_threshold", TensorProto.FLOAT, [1], [0.1]
), # Score threshold 0.1
],
outputs=[helper.make_tensor_value_info("selected_indices", TensorProto.INT64, [4, 3])],
)
model = helper.make_model(graph, producer_name="nms_test_correctness")
# Use fixed inputs instead of random
inputs = {
"boxes": boxes_data,
"scores": scores_data,
}
check_correctness(model, inputs=inputs, opset=11)
def test_nms_iou_suppression():
"""Test that NMS correctly suppresses overlapping boxes based on IoU threshold."""
nms_node = helper.make_node(
"NonMaxSuppression",
["boxes", "scores", "max_output_boxes_per_class", "iou_threshold", "score_threshold"],
["selected_indices"],
center_point_box=0,
)
# Create overlapping boxes where box 0 has higher score and should be kept
boxes_data = np.array(
[
[
[0.0, 0.0, 1.0, 1.0], # Box 0: [0,0,1,1] - highest score
[
0.1,
0.1,
1.1,
1.1,
], # Box 1: [0.1,0.1,1.1,1.1] - high IoU with box 0, should be suppressed
[2.0, 2.0, 3.0, 3.0],
]
], # Box 2: [2,2,3,3] - no overlap, should be kept
dtype=np.float32,
)
# Box 0 has highest score, Box 1 should be suppressed due to IoU with box 0
scores_data = np.array([[[0.9, 0.8, 0.7]]], dtype=np.float32)
boxes_shape = [1, 3, 4]
scores_shape = [1, 1, 3]
graph = helper.make_graph(
[nms_node],
"nms_test_iou_suppression",
inputs=[
helper.make_tensor_value_info("boxes", TensorProto.FLOAT, boxes_shape),
helper.make_tensor_value_info("scores", TensorProto.FLOAT, scores_shape),
],
initializer=[
helper.make_tensor("max_output_boxes_per_class", TensorProto.INT64, [1], [2]),
helper.make_tensor("iou_threshold", TensorProto.FLOAT, [1], [0.5]), # IoU threshold 0.5
helper.make_tensor("score_threshold", TensorProto.FLOAT, [1], [0.1]),
],
outputs=[helper.make_tensor_value_info("selected_indices", TensorProto.INT64, [2, 3])],
)
model = helper.make_model(graph, producer_name="nms_test_iou_suppression")
model.opset_import[0].version = 11
inputs = {
"boxes": boxes_data,
"scores": scores_data,
}
# Run ONNX Runtime
ort_session = onnxruntime.InferenceSession(
model.SerializeToString(), providers=["CPUExecutionProvider"]
)
ort_output = ort_session.run([], inputs)
# Run TVM
tvm_model = from_onnx(model, opset=11, keep_params_in_input=True)
tvm_model = relax.transform.DecomposeOpsForInference()(tvm_model)
tvm_model = relax.transform.LegalizeOps()(tvm_model)
tvm_model, params = relax.frontend.detach_params(tvm_model)
with tvm.transform.PassContext(opt_level=3):
ex = tvm.compile(tvm_model, target="llvm")
vm = relax.VirtualMachine(ex, tvm.cpu())
input_list = [
inputs[key.name_hint] for key in tvm_model["main"].params if key.name_hint in inputs
]
if params:
input_list += params["main"]
vm.set_input("main", *input_list)
vm.invoke_stateful("main")
tvm_output = vm.get_outputs("main")
# Custom NMS output comparison
if isinstance(tvm_output, (list, tuple)):
tvm_selected = tvm_output[0].numpy()
else:
tvm_selected = tvm_output.numpy()
ort_selected = ort_output[0]
# For NMS, compare only the valid rows
min_rows = min(tvm_selected.shape[0], ort_selected.shape[0])
if min_rows > 0:
tvm.testing.assert_allclose(
tvm_selected[:min_rows], ort_selected[:min_rows], rtol=1e-5, atol=1e-5
)
def test_nms_max_boxes_limit():
"""Test that NMS correctly limits the number of boxes per class."""
nms_node = helper.make_node(
"NonMaxSuppression",
["boxes", "scores", "max_output_boxes_per_class", "iou_threshold", "score_threshold"],
["selected_indices"],
center_point_box=0,
)
# Create data with 4 boxes, but limit to 2 per class
boxes_data = np.array(
[
[
[0.0, 0.0, 1.0, 1.0], # Box 0
[2.0, 0.0, 3.0, 1.0], # Box 1
[0.0, 2.0, 1.0, 3.0], # Box 2
[2.0, 2.0, 3.0, 3.0],
]
], # Box 3
dtype=np.float32,
)
# All boxes have different scores
scores_data = np.array([[[0.9, 0.8, 0.7, 0.6]]], dtype=np.float32)
boxes_shape = [1, 4, 4]
scores_shape = [1, 1, 4]
graph = helper.make_graph(
[nms_node],
"nms_test_max_boxes_limit",
inputs=[
helper.make_tensor_value_info("boxes", TensorProto.FLOAT, boxes_shape),
helper.make_tensor_value_info("scores", TensorProto.FLOAT, scores_shape),
],
initializer=[
helper.make_tensor(
"max_output_boxes_per_class", TensorProto.INT64, [1], [2]
), # Limit to 2 boxes
helper.make_tensor("iou_threshold", TensorProto.FLOAT, [1], [0.1]), # Low IoU threshold
helper.make_tensor("score_threshold", TensorProto.FLOAT, [1], [0.1]),
],
outputs=[helper.make_tensor_value_info("selected_indices", TensorProto.INT64, [2, 3])],
)
model = helper.make_model(graph, producer_name="nms_test_max_boxes_limit")
model.opset_import[0].version = 11
inputs = {
"boxes": boxes_data,
"scores": scores_data,
}
# Run ONNX Runtime
ort_session = onnxruntime.InferenceSession(
model.SerializeToString(), providers=["CPUExecutionProvider"]
)
ort_output = ort_session.run([], inputs)
# Run TVM
tvm_model = from_onnx(model, opset=11, keep_params_in_input=True)
tvm_model = relax.transform.DecomposeOpsForInference()(tvm_model)
tvm_model = relax.transform.LegalizeOps()(tvm_model)
tvm_model, params = relax.frontend.detach_params(tvm_model)
with tvm.transform.PassContext(opt_level=3):
ex = tvm.compile(tvm_model, target="llvm")
vm = relax.VirtualMachine(ex, tvm.cpu())
input_list = [
inputs[key.name_hint] for key in tvm_model["main"].params if key.name_hint in inputs
]
if params:
input_list += params["main"]
vm.set_input("main", *input_list)
vm.invoke_stateful("main")
tvm_output = vm.get_outputs("main")
# Custom NMS output comparison
if isinstance(tvm_output, (list, tuple)):
tvm_selected = tvm_output[0].numpy()
else:
tvm_selected = tvm_output.numpy()
ort_selected = ort_output[0]
# For NMS, compare only the valid rows
min_rows = min(tvm_selected.shape[0], ort_selected.shape[0])
if min_rows > 0:
tvm.testing.assert_allclose(
tvm_selected[:min_rows], ort_selected[:min_rows], rtol=1e-5, atol=1e-5
)
def test_nms_score_threshold():
"""Test that NMS correctly filters boxes based on score threshold.
Note: This test uses a low score threshold (0.05) to ensure both TVM and ONNX Runtime
output the same fixed shape [3,3], allowing use of the standard check_correctness function.
"""
nms_node = helper.make_node(
"NonMaxSuppression",
["boxes", "scores", "max_output_boxes_per_class", "iou_threshold", "score_threshold"],
["selected_indices"],
center_point_box=0,
)
# Create data with varying scores - ensure we get exactly 3 boxes after NMS
boxes_data = np.array(
[
[[0.0, 0.0, 1.0, 1.0], [2.0, 0.0, 3.0, 1.0], [0.0, 2.0, 1.0, 3.0]] # Box 0 # Box 1
], # Box 2
dtype=np.float32,
)
# Scores: 0.9, 0.3, 0.1 - adjust score threshold to get exactly 3 boxes
scores_data = np.array([[[0.9, 0.3, 0.1]]], dtype=np.float32)
boxes_shape = [1, 3, 4]
scores_shape = [1, 1, 3]
graph = helper.make_graph(
[nms_node],
"nms_test_score_threshold",
inputs=[
helper.make_tensor_value_info("boxes", TensorProto.FLOAT, boxes_shape),
helper.make_tensor_value_info("scores", TensorProto.FLOAT, scores_shape),
],
initializer=[
helper.make_tensor("max_output_boxes_per_class", TensorProto.INT64, [1], [3]),
helper.make_tensor("iou_threshold", TensorProto.FLOAT, [1], [0.1]),
helper.make_tensor("score_threshold", TensorProto.FLOAT, [1], [0.05]),
],
outputs=[helper.make_tensor_value_info("selected_indices", TensorProto.INT64, [3, 3])],
)
model = helper.make_model(graph, producer_name="nms_test_score_threshold")
model.opset_import[0].version = 11
inputs = {
"boxes": boxes_data,
"scores": scores_data,
}
# Run ONNX Runtime
ort_session = onnxruntime.InferenceSession(
model.SerializeToString(), providers=["CPUExecutionProvider"]
)
ort_output = ort_session.run([], inputs)
# Run TVM
tvm_model = from_onnx(model, opset=11, keep_params_in_input=True)
tvm_model = relax.transform.DecomposeOpsForInference()(tvm_model)
tvm_model = relax.transform.LegalizeOps()(tvm_model)
tvm_model, params = relax.frontend.detach_params(tvm_model)
with tvm.transform.PassContext(opt_level=3):
ex = tvm.compile(tvm_model, target="llvm")
vm = relax.VirtualMachine(ex, tvm.cpu())
input_list = [
inputs[key.name_hint] for key in tvm_model["main"].params if key.name_hint in inputs
]
if params:
input_list += params["main"]
vm.set_input("main", *input_list)
vm.invoke_stateful("main")
tvm_output = vm.get_outputs("main")
# Custom NMS output comparison
if isinstance(tvm_output, (list, tuple)):
tvm_selected = tvm_output[0].numpy()
else:
tvm_selected = tvm_output.numpy()
ort_selected = ort_output[0]
# For NMS, compare only the valid rows
min_rows = min(tvm_selected.shape[0], ort_selected.shape[0])
if min_rows > 0:
tvm.testing.assert_allclose(
tvm_selected[:min_rows], ort_selected[:min_rows], rtol=1e-5, atol=1e-5
)
if __name__ == "__main__":
tvm.testing.main()