| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| import pytest |
| import tvm |
| from tvm import relay |
| from tvm.relay import testing |
| from tvm.relay.backend.interpreter import ConstructorValue |
| from tvm.relay import create_executor |
| from tvm.relay.prelude import Prelude, StaticTensorArrayOps |
| from tvm.relay.testing import count as count_, make_nat_value, make_nat_expr |
| |
| import numpy as np |
| |
| |
| def vmobj_to_list(mod, o, dtype="float32"): |
| _, tensor_nil, _, _, _, _, _, _, _ = mod.get_type(f"tensor_{dtype}_t") |
| if isinstance(o, tvm.nd.NDArray): |
| return [o.numpy().tolist()] |
| elif isinstance(o, tvm.runtime.container.ADT): |
| if len(o) == 0: |
| if tensor_nil.tag == o.tag: |
| return [0] |
| return [] |
| |
| result = [] |
| for f in o: |
| result.extend(vmobj_to_list(mod, f, dtype)) |
| return result |
| elif isinstance(o, tvm.relay.backend.interpreter.ConstructorValue): |
| if o.constructor.name_hint == "Cons": |
| tl = vmobj_to_list(mod, o.fields[1], dtype) |
| hd = vmobj_to_list(mod, o.fields[0], dtype) |
| hd.extend(tl) |
| return hd |
| elif o.constructor.name_hint == "Nil": |
| return [] |
| elif "tensor_nil" in o.constructor.name_hint: |
| return [0] |
| elif "tensor" in o.constructor.name_hint: |
| return [o.fields[0].numpy()] |
| else: |
| raise RuntimeError("Unknown object type: %s" % o.constructor.name_hint) |
| else: |
| raise RuntimeError("Unknown object type: %s" % type(o)) |
| |
| |
| def check_tensor_array(ta_mod, ref_res, *args, dtype="float32", rtol=1e-5): |
| for kind in ["debug", "vm"]: |
| for target, dev in [("llvm", tvm.cpu(0))]: # testing.enabled_targets(): |
| if kind == "debug" and dev.device_type != tvm.cpu().device_type: |
| continue |
| result = relay.create_executor(kind, mod=ta_mod, device=dev, target=target).evaluate()( |
| *args |
| ) |
| got = vmobj_to_list(ta_mod, result, dtype) |
| tvm.testing.assert_allclose(ref_res, got, rtol=rtol, atol=rtol) |
| |
| |
| @tvm.testing.uses_gpu |
| def test_tensor_expand_dims(): |
| def run(dtype): |
| x = relay.var("x") |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| expand_dims_func = p.get_global_var("tensor_expand_dims", dtype) |
| tensor1 = p.get_tensor_ctor("tensor1", dtype) |
| mod["main"] = relay.Function([x], expand_dims_func(tensor1(x))) |
| x_np = np.random.uniform(low=0.0, high=8.0, size=(1,)).astype(dtype) |
| expected = [np.expand_dims(x_np, axis=0)] |
| check_tensor_array(mod, expected, x_np) |
| |
| run("float32") |
| run("int32") |
| |
| |
| @tvm.testing.uses_gpu |
| def test_tensor_array_constructor(): |
| def run(dtype): |
| x = relay.var("x") |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| tensor_array = p.get_global_var("tensor_array", dtype) |
| mod["main"] = relay.Function([x], tensor_array(x)) |
| expected = np.array([0, 0, 0, 0, 0]) |
| check_tensor_array(mod, expected, 5, dtype=dtype) |
| |
| run("float32") |
| run("int32") |
| |
| |
| @tvm.testing.uses_gpu |
| def test_tensor_array_read(): |
| def run(dtype): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| l = relay.var("l") |
| i = relay.var("i") |
| read_func = p.get_global_var("tensor_array_read", dtype) |
| tensor_array = p.get_global_var("tensor_array", dtype) |
| mod["main"] = relay.Function([l, i], read_func(tensor_array(l), i)) |
| expected = [0] |
| check_tensor_array(mod, expected, *(1, 0), dtype=dtype) |
| check_tensor_array(mod, expected, *(5, 1), dtype=dtype) |
| |
| run("float32") |
| run("int32") |
| |
| |
| @tvm.testing.uses_gpu |
| def test_tensor_array_write(): |
| def run(dtype): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| tensor_t = p.get_type("tensor_t", dtype) |
| v1 = relay.var("v1") |
| v2 = relay.var("v2") |
| tensor_array = p.get_global_var("tensor_array", dtype) |
| init_tensor_array = tensor_array(relay.const(2)) |
| write_func = p.get_global_var("tensor_array_write", dtype) |
| tensor1 = p.get_tensor_ctor("tensor1", dtype) |
| tensor_array1 = write_func(init_tensor_array, relay.const(0), tensor1(v1)) |
| tensor_array2 = write_func(tensor_array1, relay.const(1), tensor1(v2)) |
| mod["main"] = relay.Function([v1, v2], tensor_array2) |
| expected = [3, 7] |
| check_tensor_array(mod, expected, *(3, 7), dtype=dtype) |
| |
| run("float32") |
| run("int32") |
| |
| |
| @tvm.testing.uses_gpu |
| def test_tensor_array_stack(): |
| def run(dtype): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| tensor_t = p.get_type("tensor_t", dtype) |
| rlist = p.mod.get_global_type_var(f"List") |
| tensor_array = p.get_global_var("tensor_array", dtype) |
| tensor1 = p.get_tensor_ctor("tensor1", dtype) |
| write = p.get_global_var("tensor_array_write", dtype) |
| stack = p.get_global_var("tensor_array_stack", dtype) |
| # TODO extract test case from inference failures |
| # setting this wrong causes crashes |
| v = relay.var("v", shape=(1,), dtype=dtype) |
| init_tensor_array = tensor_array(relay.const(3)) |
| tensor_array1 = write(init_tensor_array, relay.const(0), tensor1(v)) |
| tensor_array2 = write(tensor_array1, relay.const(1), tensor1(v)) |
| tensor_array3 = write(tensor_array2, relay.const(2), tensor1(v)) |
| tensor_array4 = stack(tensor_array3) |
| mod["main"] = relay.Function([v], tensor_array4, tensor_t()) |
| t = np.random.uniform(low=0.0, high=8.0, size=(1,)).astype(dtype) |
| expected = [np.stack([t, t, t])] |
| check_tensor_array(mod, expected, t, dtype=dtype) |
| |
| run("float32") |
| run("int32") |
| |
| |
| @tvm.testing.uses_gpu |
| def test_tensor_array_unstack(): |
| def run(dtype): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| unstack_tensor1 = p.get_global_var("tensor_array_unstack_tensor1", dtype) |
| v = relay.var("v") |
| mod["main"] = relay.Function([v], unstack_tensor1(v)) |
| t = np.random.uniform(low=0.0, high=8.0, size=(1,)).astype(dtype) |
| check_tensor_array(mod, t, t, dtype=dtype) |
| |
| run("float32") |
| run("int32") |
| |
| |
| @tvm.testing.uses_gpu |
| def test_tensor_take(): |
| def run(dtype): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| take = p.get_global_var("tensor_take", dtype) |
| tensor2 = p.get_tensor_ctor("tensor2", dtype) |
| v = relay.var("v") |
| lower = relay.var("lower") |
| upper = relay.var("upper") |
| mod["main"] = relay.Function([v, lower, upper], take(tensor2(v), lower, upper)) |
| v_data = np.random.uniform(low=0.0, high=8.0, size=(10, 10)).astype(dtype) |
| expected = [np.take(v_data, range(2, 5), axis=0)] |
| check_tensor_array(mod, expected, *(v_data, 2, 5), dtype=dtype) |
| expected = [np.take(v_data, range(0, 9), axis=0)] |
| check_tensor_array(mod, expected, *(v_data, 0, 9), dtype=dtype) |
| |
| run("float32") |
| run("int32") |
| |
| |
| @tvm.testing.uses_gpu |
| def test_tensor_concatenate(): |
| def run(dtype): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| concat = p.get_global_var("tensor_concatenate", dtype) |
| tensor1 = p.get_tensor_ctor("tensor1", dtype) |
| v1 = relay.var("v1", shape=(tvm.tir.Any(),), dtype=dtype) |
| v2 = relay.var("v2", shape=(tvm.tir.Any(),), dtype=dtype) |
| mod["main"] = relay.Function([v1, v2], concat(tensor1(v1), tensor1(v2))) |
| v1_data = np.random.uniform(low=0.0, high=8.0, size=(5,)).astype(dtype) |
| v2_data = np.random.uniform(low=0.0, high=8.0, size=(5,)).astype(dtype) |
| expected = [np.concatenate((v1_data, v2_data))] |
| check_tensor_array(mod, expected, *(v1_data, v2_data), dtype=dtype) |
| |
| run("float32") |
| run("int32") |
| |
| |
| @tvm.testing.uses_gpu |
| def test_tensor_array_concat(): |
| def run(dtype): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| v1 = relay.var("v1") |
| v2 = relay.var("v2") |
| tensor_array = p.get_global_var("tensor_array", dtype) |
| tensor_array1 = tensor_array(relay.const(2)) |
| write_func = p.get_global_var("tensor_array_write", dtype) |
| concat_func = p.get_global_var("tensor_array_concat", dtype) |
| tensor1 = p.get_tensor_ctor("tensor2", dtype) |
| tensor_array1 = write_func(tensor_array1, relay.const(0), tensor1(v1)) |
| tensor_array1 = write_func(tensor_array1, relay.const(1), tensor1(v2)) |
| tensor_array_concat = concat_func(tensor_array1) |
| mod["main"] = relay.Function([v1, v2], tensor_array_concat) |
| v1_data = np.random.uniform(low=0.0, high=8.0, size=(2, 3)).astype(dtype) |
| v2_data = np.random.uniform(low=0.0, high=8.0, size=(1, 3)).astype(dtype) |
| expected = [np.concatenate((v1_data, v2_data), axis=0)] |
| check_tensor_array(mod, expected, *(v1_data, v2_data), dtype=dtype) |
| |
| run("float32") |
| run("int32") |
| |
| |
| @tvm.testing.uses_gpu |
| def test_tensor_array_scatter(): |
| def run(dtype): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| |
| # tensor array |
| v1 = relay.var("v1") |
| v2 = relay.var("v2") |
| v3 = relay.var("v2") |
| tensor_array = p.get_global_var("tensor_array", dtype) |
| tensor_array1 = tensor_array(relay.const(3)) |
| write_func = p.get_global_var("tensor_array_write", dtype) |
| scatter_func = p.get_global_var("tensor_array_scatter", dtype) |
| tensor2 = p.get_tensor_ctor("tensor2", dtype) |
| tensor_array1 = write_func(tensor_array1, relay.const(0), tensor2(v1)) |
| tensor_array1 = write_func(tensor_array1, relay.const(1), tensor2(v2)) |
| tensor_array1 = write_func(tensor_array1, relay.const(2), tensor2(v3)) |
| |
| # indices array |
| index = relay.var("index") |
| |
| # values array |
| value_0 = relay.var("value_0") |
| value_1 = relay.var("value_1") |
| values_array = tensor_array(relay.const(2)) |
| values_array = write_func(values_array, relay.const(0), tensor2(value_0)) |
| values_array = write_func(values_array, relay.const(1), tensor2(value_1)) |
| |
| # create the scatter function |
| tensor_array_scatter = scatter_func(tensor_array1, index, values_array) |
| mod["main"] = relay.Function([v1, v2, v3, index, value_0, value_1], tensor_array_scatter) |
| |
| # initialize and check |
| v1_data = np.random.uniform(low=0.0, high=8.0, size=(2, 3)).astype(dtype) |
| v2_data = np.random.uniform(low=0.0, high=8.0, size=(2, 3)).astype(dtype) |
| v3_data = np.random.uniform(low=0.0, high=8.0, size=(2, 3)).astype(dtype) |
| index_data = np.array([0, 1], dtype="int32") |
| val1_data = np.random.uniform(low=0.0, high=8.0, size=(2, 3)).astype(dtype) |
| val2_data = np.random.uniform(low=0.0, high=8.0, size=(2, 3)).astype(dtype) |
| expected = [val1_data, val2_data, v3_data] |
| check_tensor_array( |
| mod, |
| expected, |
| *(v1_data, v2_data, v3_data, index_data, val1_data, val2_data), |
| dtype=dtype, |
| ) |
| |
| run("float32") |
| run("int32") |
| |
| |
| @tvm.testing.uses_gpu |
| def test_tensor_array_split(): |
| def run(dtype): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| |
| # tensor array |
| v1 = relay.var("v1") |
| v2 = relay.var("v2") |
| v3 = relay.var("v2") |
| tensor_array = p.get_global_var("tensor_array", dtype) |
| tensor_array1 = tensor_array(relay.const(3)) |
| write_func = p.get_global_var("tensor_array_write", dtype) |
| split_func = p.get_global_var("tensor_array_split", dtype) |
| tensor2 = p.get_tensor_ctor("tensor2", dtype) |
| tensor_array1 = write_func(tensor_array1, relay.const(0), tensor2(v1)) |
| tensor_array1 = write_func(tensor_array1, relay.const(1), tensor2(v2)) |
| tensor_array1 = write_func(tensor_array1, relay.const(2), tensor2(v3)) |
| |
| # value tensor |
| value = relay.var("value") |
| |
| # lengths tensor |
| ta_len = relay.var("length") |
| |
| # create the scatter function |
| tensor_array_split = split_func(tensor_array1, tensor2(value), ta_len) |
| mod["main"] = relay.Function([v1, v2, v3, value, ta_len], tensor_array_split) |
| |
| # initialize and check |
| v1_data = np.random.uniform(low=0.0, high=8.0, size=(2, 3)).astype(dtype) |
| v2_data = np.random.uniform(low=0.0, high=8.0, size=(2, 3)).astype(dtype) |
| v3_data = np.random.uniform(low=0.0, high=8.0, size=(2, 3)).astype(dtype) |
| value_data = np.random.uniform(low=0.0, high=8.0, size=(4, 3)).astype(dtype) |
| length_data = np.array([2, 2], dtype="int32") |
| expected = np.concatenate([value_data, v3_data]) |
| expected = np.split(expected, indices_or_sections=[2, 4]) |
| check_tensor_array( |
| mod, expected, *(v1_data, v2_data, v3_data, value_data, length_data), dtype=dtype |
| ) |
| |
| run("float32") |
| run("int32") |
| |
| |
| @tvm.testing.uses_gpu |
| def test_static_tensor_take(): |
| def run(dtype, shape): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| static_tensor_array_ops = StaticTensorArrayOps(p, dtype, shape) |
| static_tensor_array_ops.register() |
| |
| take = p.get_global_var_static("tensor_take", dtype, shape) |
| tensor_constructor = p.get_tensor_ctor_static("tensor_constructor", dtype, shape) |
| v = relay.var("v") |
| lower = relay.var("lower") |
| upper = relay.var("upper") |
| mod["main"] = relay.Function([v, lower, upper], take(tensor_constructor(v), lower, upper)) |
| v_data = np.random.uniform(low=0.0, high=8.0, size=shape).astype(dtype) |
| expected = [np.take(v_data, range(2, 5), axis=0)] |
| check_tensor_array(mod, expected, *(v_data, 2, 5), dtype=dtype) |
| expected = [np.take(v_data, range(0, 9), axis=0)] |
| check_tensor_array(mod, expected, *(v_data, 0, 9), dtype=dtype) |
| |
| run("float32", [10, 10]) |
| run("int32", [15, 11]) |
| |
| |
| @tvm.testing.uses_gpu |
| def test_static_tensor_concatenate(): |
| def run(dtype, shape): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| static_tensor_array_ops = StaticTensorArrayOps(p, dtype, shape) |
| static_tensor_array_ops.register() |
| |
| concat = p.get_global_var_static("tensor_concatenate", dtype, shape) |
| tensor = p.get_tensor_ctor_static("tensor_constructor", dtype, shape) |
| v1 = relay.var("v1") |
| v2 = relay.var("v2") |
| mod["main"] = relay.Function([v1, v2], concat(tensor(v1), tensor(v2))) |
| v1_data = np.random.uniform(low=0.0, high=8.0, size=shape).astype(dtype) |
| v2_data = np.random.uniform(low=0.0, high=8.0, size=shape).astype(dtype) |
| expected = [np.concatenate((v1_data, v2_data))] |
| check_tensor_array(mod, expected, *(v1_data, v2_data), dtype=dtype) |
| |
| run( |
| "float32", |
| [ |
| 5, |
| ], |
| ) |
| run("int32", [2, 3]) |
| |
| |
| @tvm.testing.uses_gpu |
| def test_static_tensor_expand_dims(): |
| def run(dtype, shape): |
| x = relay.var("x") |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| static_tensor_array_ops = StaticTensorArrayOps(p, dtype, shape) |
| static_tensor_array_ops.register() |
| |
| expand_dims_func = p.get_global_var_static("tensor_expand_dims", dtype, shape) |
| tensor = p.get_tensor_ctor_static("tensor_constructor", dtype, shape) |
| mod["main"] = relay.Function([x], expand_dims_func(tensor(x))) |
| x_np = np.random.uniform(low=0.0, high=8.0, size=shape).astype(dtype) |
| expected = [np.expand_dims(x_np, axis=0)] |
| check_tensor_array(mod, expected, x_np) |
| |
| run("float32", []) |
| run( |
| "int32", |
| [ |
| 2, |
| ], |
| ) |
| |
| |
| @tvm.testing.uses_gpu |
| def test_static_tensor_array_constructor(): |
| def run(dtype, shape): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| static_tensor_array_ops = StaticTensorArrayOps(p, dtype, shape) |
| static_tensor_array_ops.register() |
| tensor_constructor = p.get_name_static("tensor_constructor", dtype, shape) |
| assert tensor_constructor != None |
| |
| run("float32", [1, 1]) |
| |
| |
| @tvm.testing.uses_gpu |
| def test_static_tensor_array_read(): |
| def run(dtype, shape): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| static_tensor_array_ops = StaticTensorArrayOps(p, dtype, shape) |
| static_tensor_array_ops.register() |
| |
| np_data_list = [] |
| ta_length = 3 |
| for _ in range(ta_length): |
| np_data_list.append(np.random.uniform(0, 10, size=shape).astype(dtype)) |
| |
| v0 = relay.var("v0") |
| v1 = relay.var("v1") |
| v2 = relay.var("v2") |
| n = relay.var("n") |
| tensor = p.get_tensor_ctor_static("tensor_constructor", dtype, shape) |
| tensor_array = p.get_global_var_static("tensor_array", dtype, shape) |
| init_tensor_array = tensor_array(relay.const(ta_length)) |
| read_func = p.get_global_var_static("tensor_array_read", dtype, shape) |
| write_func = p.get_global_var_static("tensor_array_write", dtype, shape) |
| tensor_array0 = write_func(init_tensor_array, relay.const(0), tensor(v0)) |
| tensor_array1 = write_func(tensor_array0, relay.const(1), tensor(v1)) |
| tensor_array2 = write_func(tensor_array1, relay.const(2), tensor(v2)) |
| |
| mod["main"] = relay.Function([v0, v1, v2, n], read_func(tensor_array2, n)) |
| expected = [np_data_list[0]] |
| check_tensor_array(mod, expected, *list(np_data_list + [0]), dtype=dtype) |
| expected = [np_data_list[1]] |
| check_tensor_array(mod, expected, *list(np_data_list + [1]), dtype=dtype) |
| expected = [np_data_list[2]] |
| check_tensor_array(mod, expected, *list(np_data_list + [2]), dtype=dtype) |
| |
| run("float32", []) |
| run("int32", [2, 3]) |
| |
| |
| @tvm.testing.uses_gpu |
| def test_static_tensor_array_write(): |
| def run(dtype, shape): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| static_tensor_array_ops = StaticTensorArrayOps(p, dtype, shape) |
| static_tensor_array_ops.register() |
| |
| ta_length = 2 |
| np_data_list = [ |
| np.random.uniform(0, 10, size=shape).astype(dtype) for _ in range(ta_length) |
| ] |
| |
| v0 = relay.var("v0") |
| v1 = relay.var("v1") |
| tensor_array = p.get_global_var_static("tensor_array", dtype, shape) |
| init_tensor_array = tensor_array(relay.const(ta_length)) |
| write_func = p.get_global_var_static("tensor_array_write", dtype, shape) |
| tensor = p.get_tensor_ctor_static("tensor_constructor", dtype, shape) |
| tensor_array0 = write_func(init_tensor_array, relay.const(0), tensor(v0)) |
| tensor_array1 = write_func(tensor_array0, relay.const(1), tensor(v1)) |
| mod["main"] = relay.Function([v0, v1], tensor_array1) |
| expected = np_data_list |
| check_tensor_array(mod, expected, *np_data_list, dtype=dtype) |
| |
| run("float32", []) |
| run("int32", [2, 3]) |
| |
| |
| @tvm.testing.uses_gpu |
| def test_static_tensor_array_unstack(): |
| def run(dtype, shape): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| static_tensor_array_ops = StaticTensorArrayOps(p, dtype, shape) |
| static_tensor_array_ops.register() |
| |
| unstack_tensor = p.get_global_var_static("tensor_array_unstack", dtype, shape) |
| v = relay.var("v") |
| mod["main"] = relay.Function([v], unstack_tensor(v)) |
| t = np.random.uniform(low=0, high=10, size=shape).astype(dtype) |
| (*expected,) = t |
| check_tensor_array(mod, expected, t, dtype=dtype) |
| |
| run("float32", [4]) |
| run("int32", [2, 3]) |
| |
| |
| @tvm.testing.uses_gpu |
| def test_static_tensor_array_scatter(): |
| def run(dtype, shape, indices_shape=None): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| static_tensor_array_ops = StaticTensorArrayOps(p, dtype, shape) |
| static_tensor_array_ops.register() |
| if indices_shape is not None: |
| static_tensor_array_ops.define_tensor_array_scatter(indices_shape, True) |
| |
| # tensor array |
| v1 = relay.var("v1") |
| v2 = relay.var("v2") |
| v3 = relay.var("v2") |
| tensor_array = p.get_global_var_static("tensor_array", dtype, shape) |
| tensor_array0 = tensor_array(relay.const(3)) |
| write_func = p.get_global_var_static("tensor_array_write", dtype, shape) |
| scatter_func = p.get_global_var_static("tensor_array_scatter", dtype, shape) |
| tensor = p.get_tensor_ctor_static("tensor_constructor", dtype, shape) |
| tensor_array1 = write_func(tensor_array0, relay.const(0), tensor(v1)) |
| tensor_array1 = write_func(tensor_array1, relay.const(1), tensor(v2)) |
| tensor_array1 = write_func(tensor_array1, relay.const(2), tensor(v3)) |
| |
| # indices array |
| index = relay.var("index") |
| |
| # values array |
| value_0 = relay.var("value_0") |
| value_1 = relay.var("value_1") |
| values_array = tensor_array(relay.const(2)) |
| values_array = write_func(values_array, relay.const(0), tensor(value_0)) |
| values_array = write_func(values_array, relay.const(1), tensor(value_1)) |
| |
| # create the scatter function |
| tensor_array_scatter = scatter_func(tensor_array1, index, values_array) |
| mod["main"] = relay.Function([v1, v2, v3, index, value_0, value_1], tensor_array_scatter) |
| |
| # initialize and check |
| v1_data = np.random.uniform(low=0.0, high=8.0, size=shape).astype(dtype) |
| v2_data = np.random.uniform(low=0.0, high=8.0, size=shape).astype(dtype) |
| v3_data = np.random.uniform(low=0.0, high=8.0, size=shape).astype(dtype) |
| index_data = np.array([0, 1], dtype="int32") |
| val1_data = np.random.uniform(low=0.0, high=8.0, size=shape).astype(dtype) |
| val2_data = np.random.uniform(low=0.0, high=8.0, size=shape).astype(dtype) |
| expected = [val1_data, val2_data, v3_data] |
| check_tensor_array( |
| mod, |
| expected, |
| *(v1_data, v2_data, v3_data, index_data, val1_data, val2_data), |
| dtype=dtype, |
| ) |
| |
| run("float32", [2, 3]) |
| run("int32", [2, 3]) |
| run( |
| "float32", |
| [2, 3], |
| [ |
| 2, |
| ], |
| ) |
| |
| |
| @tvm.testing.uses_gpu |
| def test_static_tensor_array_split(): |
| def run(dtype, shape, value_shape=None, lengths_shape=None): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| static_tensor_array_ops = StaticTensorArrayOps(p, dtype, shape) |
| static_tensor_array_ops.register() |
| if value_shape is not None or lengths_shape is not None: |
| static_tensor_array_ops.define_tensor_array_split(value_shape, lengths_shape, False) |
| |
| # tensor array |
| v1 = relay.var("v1") |
| v2 = relay.var("v2") |
| v3 = relay.var("v2") |
| |
| adt_shape = [ |
| relay.Any(), |
| ] + shape[1:] |
| test_ops = StaticTensorArrayOps(p, dtype, adt_shape) |
| test_ops.register() |
| tensor_array = test_ops.get_global_var("tensor_array") |
| |
| tensor_array1 = tensor_array(relay.const(3)) |
| write_func = test_ops.get_global_var("tensor_array_write") |
| split_ops = StaticTensorArrayOps(p, dtype, shape) |
| split_ops.register() |
| split_func = split_ops.get_global_var("tensor_array_split") |
| tensor = p.get_tensor_ctor_static("tensor_constructor", dtype, test_ops.shape) |
| tensor_array1 = write_func(tensor_array1, relay.const(0), tensor(v1)) |
| tensor_array1 = write_func(tensor_array1, relay.const(1), tensor(v2)) |
| tensor_array1 = write_func(tensor_array1, relay.const(2), tensor(v3)) |
| |
| # value tensor |
| value = relay.var("value") |
| |
| # lengths tensor |
| ta_len = relay.var("length") |
| |
| # create the split function |
| if value_shape is None: |
| tensor1 = p.get_tensor_ctor_static("tensor_constructor", dtype, shape) |
| else: |
| static_tensor_array_ops = StaticTensorArrayOps(p, dtype, value_shape) |
| static_tensor_array_ops.register() |
| tensor1 = p.get_tensor_ctor_static("tensor_constructor", dtype, test_ops.shape) |
| |
| tensor_array_split = split_func(tensor_array1, tensor1(value), ta_len) |
| mod["main"] = relay.Function([v1, v2, v3, value, ta_len], tensor_array_split) |
| |
| # initialize and check |
| v1_data = np.random.uniform(low=0.0, high=8.0, size=[2, 3]).astype(dtype) |
| v2_data = np.random.uniform(low=0.0, high=8.0, size=[2, 3]).astype(dtype) |
| v3_data = np.random.uniform(low=0.0, high=8.0, size=[2, 3]).astype(dtype) |
| value_data = np.random.uniform(low=0.0, high=8.0, size=value_shape or shape).astype(dtype) |
| length_data = np.array([2, 2], dtype="int32") |
| expected = np.concatenate([value_data, v3_data]) |
| expected = np.split(expected, indices_or_sections=[2, 4]) |
| check_tensor_array( |
| mod, expected, *(v1_data, v2_data, v3_data, value_data, length_data), dtype=dtype |
| ) |
| |
| run("float32", [4, 3]) |
| run("int32", [4, 3]) |
| run( |
| "int32", |
| [relay.Any(), 3], |
| [4, 3], |
| [ |
| 2, |
| ], |
| ) |
| |
| |
| @tvm.testing.uses_gpu |
| def test_static_tensor_array_concat(): |
| def run(dtype, shape): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| static_tensor_array_ops = StaticTensorArrayOps(p, dtype, shape) |
| static_tensor_array_ops.register() |
| |
| v1 = relay.var("v1") |
| v2 = relay.var("v2") |
| tensor_array = p.get_global_var_static("tensor_array", dtype, shape) |
| tensor_array1 = tensor_array(relay.const(2)) |
| write_func = p.get_global_var_static("tensor_array_write", dtype, shape) |
| concat_func = p.get_global_var_static("tensor_array_concat", dtype, shape) |
| tensor = p.get_tensor_ctor_static("tensor_constructor", dtype, shape) |
| tensor_array1 = write_func(tensor_array1, relay.const(0), tensor(v1)) |
| tensor_array1 = write_func(tensor_array1, relay.const(1), tensor(v2)) |
| tensor_array_concat = concat_func(tensor_array1) |
| mod["main"] = relay.Function([v1, v2], tensor_array_concat) |
| v1_data = np.random.uniform(low=0.0, high=8.0, size=(2, 3)).astype(dtype) |
| v2_data = np.random.uniform(low=0.0, high=8.0, size=(1, 3)).astype(dtype) |
| expected = [np.concatenate((v1_data, v2_data), axis=0)] |
| check_tensor_array(mod, expected, *(v1_data, v2_data), dtype=dtype) |
| |
| run("float32", [relay.Any(), 3]) |
| run("int32", [relay.Any(), 3]) |
| |
| |
| @tvm.testing.uses_gpu |
| def test_static_tensor_array_gather(): |
| def run(dtype, shape): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| static_tensor_array_ops = StaticTensorArrayOps(p, dtype, shape) |
| static_tensor_array_ops.register() |
| |
| tensor_array = p.get_global_var_static("tensor_array", dtype, shape) |
| tensor = p.get_tensor_ctor_static("tensor_constructor", dtype, shape) |
| write = p.get_global_var_static("tensor_array_write", dtype, shape) |
| gather = p.get_global_var_static("tensor_array_gather", dtype, shape) |
| v = relay.var("v") |
| indice = relay.var("indice") |
| init_tensor_array = tensor_array(relay.const(3)) |
| tensor_array1 = write(init_tensor_array, relay.const(0), tensor(v)) |
| tensor_array2 = write(tensor_array1, relay.const(1), tensor(v)) |
| tensor_array3 = write(tensor_array2, relay.const(2), tensor(v)) |
| out = gather(tensor_array3, indice) |
| mod["main"] = relay.Function([v, indice], out) |
| t = np.random.uniform(low=0.0, high=8.0, size=shape).astype(dtype) |
| indice_data = np.array([0, 2], dtype="int32") |
| expected = [np.stack([t, t])] |
| check_tensor_array(mod, expected, *(t, indice_data), dtype=dtype) |
| |
| run("float32", []) |
| run("int32", [2, 3]) |
| |
| |
| @tvm.testing.uses_gpu |
| def test_static_tensor_array_stack(): |
| def run(dtype, shape): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| static_tensor_array_ops = StaticTensorArrayOps(p, dtype, shape) |
| static_tensor_array_ops.register() |
| |
| tensor_array = p.get_global_var_static("tensor_array", dtype, shape) |
| tensor = p.get_tensor_ctor_static("tensor_constructor", dtype, shape) |
| write = p.get_global_var_static("tensor_array_write", dtype, shape) |
| stack = p.get_global_var_static("tensor_array_stack", dtype, shape) |
| v = relay.var("v") |
| init_tensor_array = tensor_array(relay.const(3)) |
| tensor_array1 = write(init_tensor_array, relay.const(0), tensor(v)) |
| tensor_array2 = write(tensor_array1, relay.const(1), tensor(v)) |
| tensor_array3 = write(tensor_array2, relay.const(2), tensor(v)) |
| tensor_array4 = stack(tensor_array3) |
| mod["main"] = relay.Function([v], tensor_array4) |
| t = np.random.uniform(low=0.0, high=8.0, size=shape).astype(dtype) |
| expected = [np.stack([t, t, t])] |
| check_tensor_array(mod, expected, t, dtype=dtype) |
| |
| run("float32", []) |
| run("int32", [2, 3]) |
| |
| |
| @tvm.testing.uses_gpu |
| def test_static_tensor_get_data(): |
| def run(dtype, shape): |
| mod = tvm.IRModule() |
| p = Prelude(mod) |
| static_tensor_array_ops = StaticTensorArrayOps(p, dtype, shape) |
| static_tensor_array_ops.register() |
| |
| np_data_list = [] |
| ta_length = 3 |
| for _ in range(ta_length): |
| np_data_list.append(np.random.uniform(0, 10, size=shape).astype(dtype)) |
| |
| v0 = relay.var("v0") |
| v1 = relay.var("v1") |
| v2 = relay.var("v2") |
| n = relay.var("n") |
| tensor = p.get_tensor_ctor_static("tensor_constructor", dtype, shape) |
| tensor_array = p.get_global_var_static("tensor_array", dtype, shape) |
| init_tensor_array = tensor_array(relay.const(ta_length)) |
| read_func = p.get_global_var_static("tensor_array_read", dtype, shape) |
| write_func = p.get_global_var_static("tensor_array_write", dtype, shape) |
| get_data_func = p.get_global_var_static("tensor_get_data", dtype, shape) |
| tensor_array0 = write_func(init_tensor_array, relay.const(0), tensor(v0)) |
| tensor_array1 = write_func(tensor_array0, relay.const(1), tensor(v1)) |
| tensor_array2 = write_func(tensor_array1, relay.const(2), tensor(v2)) |
| |
| mod["main"] = relay.Function([v0, v1, v2, n], get_data_func(read_func(tensor_array2, n))) |
| expected = [np_data_list[0]] |
| check_tensor_array(mod, expected, *list(np_data_list + [0]), dtype=dtype) |
| expected = [np_data_list[1]] |
| check_tensor_array(mod, expected, *list(np_data_list + [1]), dtype=dtype) |
| expected = [np_data_list[2]] |
| check_tensor_array(mod, expected, *list(np_data_list + [2]), dtype=dtype) |
| |
| run("float32", []) |
| run("int32", [2, 3]) |
| |
| |
| if __name__ == "__main__": |
| tvm.testing.main() |