| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # ruff: noqa: E712, F401, F821 |
| import ctypes |
| import math |
| |
| import numpy as np |
| import scipy |
| |
| import tvm |
| import tvm.testing |
| from tvm import te, tirx, topi |
| from tvm.contrib import clang, utils |
| from tvm.script import tirx as T |
| |
| |
| def test_nearbyint(): |
| m = te.var( |
| "m", |
| ) |
| A = te.placeholder((m,), name="A") |
| A_rounded = te.compute((m,), lambda *i: tvm.tirx.nearbyint(A(*i)), name="A") |
| |
| # Convert to TIR and create schedule |
| mod = te.create_prim_func([A, A_rounded]) |
| sch = tvm.s_tir.Schedule(mod) |
| |
| # Build from scheduled TIR |
| func = tvm.compile(sch.mod, target="llvm") |
| |
| dev = tvm.cpu(0) |
| n = 10 |
| a = tvm.runtime.tensor(np.random.uniform(high=100, size=n).astype(A.dtype), dev) |
| a_rounded = tvm.runtime.tensor(np.random.uniform(size=n).astype(A_rounded.dtype), dev) |
| func(a, a_rounded) |
| # Note that numpys rint rounds to nearest integer with |
| # ties to halfway is broken by rounding to even. |
| # So that 1.5 and 2.5 will round 2. |
| # This is the default rounding mode with libc as well. |
| # However one can set a different rounding mode and in that |
| # case numpy result might differ. |
| tvm.testing.assert_allclose(a_rounded.numpy(), np.rint(a.numpy())) |
| |
| |
| def test_round_ties_to_even(): |
| """Test that tir.round uses ties-to-even (banker's rounding) semantics.""" |
| m = te.var("m") |
| A = te.placeholder((m,), name="A") |
| A_rounded = te.compute((m,), lambda *i: tvm.tirx.round(A(*i)), name="A") |
| |
| mod = te.create_prim_func([A, A_rounded]) |
| sch = tvm.s_tir.Schedule(mod) |
| func = tvm.compile(sch.mod, target="llvm") |
| |
| dev = tvm.cpu(0) |
| # Midpoint values where ties-to-even and ties-away differ |
| test_values = np.array([0.5, 1.5, 2.5, 3.5, -0.5, -1.5, -2.5, -3.5], dtype="float32") |
| expected = np.array([0.0, 2.0, 2.0, 4.0, 0.0, -2.0, -2.0, -4.0], dtype="float32") |
| |
| a = tvm.runtime.tensor(test_values, dev) |
| a_rounded = tvm.runtime.tensor(np.zeros(len(test_values), dtype="float32"), dev) |
| func(a, a_rounded) |
| tvm.testing.assert_allclose(a_rounded.numpy(), expected) |
| |
| |
| def test_round_intrinsics_on_int(): |
| i = tvm.tirx.Var("i", "int32") |
| for op in [tvm.tirx.round, tvm.tirx.trunc, tvm.tirx.ceil, tvm.tirx.floor, tvm.tirx.nearbyint]: |
| assert op(tvm.tirx.const(10, "int32")).value == 10 |
| assert op(tvm.tirx.const(True, "bool")).value == True |
| assert op(i).same_as(i) |
| |
| assert tvm.tirx.isnan(tvm.tirx.const(10, "int32")).value == False |
| |
| |
| def test_unary_intrin(): |
| test_funcs = [ |
| (tvm.tirx.exp, lambda x: np.exp(x)), |
| (tvm.tirx.exp10, lambda x: np.power(10, x)), |
| (tvm.tirx.log2, lambda x: np.log2(x)), |
| (tvm.tirx.log10, lambda x: np.log10(x)), |
| (tvm.tirx.sinh, lambda x: np.sinh(x)), |
| (tvm.tirx.cosh, lambda x: np.cosh(x)), |
| (tvm.tirx.log1p, lambda x: np.log1p(x)), |
| (tvm.tirx.asin, lambda x: np.arcsin(x)), |
| (tvm.tirx.acos, lambda x: np.arccos(x)), |
| (tvm.tirx.atan, lambda x: np.arctan(x)), |
| (tvm.tirx.asinh, lambda x: np.arcsinh(x)), |
| (tvm.tirx.acosh, lambda x: np.arccosh(x)), |
| (tvm.tirx.atanh, lambda x: np.arctanh(x)), |
| (tvm.tirx.erf, lambda x: scipy.special.erf(x)), |
| ] |
| |
| def run_test(tvm_intrin, np_func, atol=1e-5, rtol=1e-5): |
| m = te.var( |
| "m", |
| ) |
| A = te.placeholder((m,), name="A") |
| B = te.compute((m,), lambda *i: tvm_intrin(A(*i)), name="B") |
| |
| # Convert to TIR and create schedule |
| mod = te.create_prim_func([A, B]) |
| sch = tvm.s_tir.Schedule(mod) |
| |
| # Build from scheduled TIR |
| func = tvm.compile(sch.mod, target="llvm") |
| |
| dev = tvm.cpu(0) |
| n = 10 |
| a = tvm.runtime.tensor(np.random.uniform(0.1, 0.5, size=n).astype(A.dtype), dev) |
| b = tvm.runtime.tensor(np.random.uniform(size=n).astype(A.dtype), dev) |
| func(a, b) |
| tvm.testing.assert_allclose(b.numpy(), np_func(a.numpy()), atol=atol, rtol=rtol) |
| |
| # Out-of-bounds test for asin/acos |
| name = tvm_intrin.__name__ |
| if name in ("asin", "acos"): |
| # generate some values outside [-1, 1] |
| n = 8 |
| out_np = np.concatenate( |
| [ |
| np.random.uniform(1.1, 2.0, size=n // 2), |
| np.random.uniform(-2.0, -1.1, size=n // 2), |
| ] |
| ).astype(A.dtype) |
| a2 = tvm.runtime.tensor(out_np, dev) |
| b2 = tvm.runtime.tensor(np.empty_like(out_np), dev) |
| func(a2, b2) |
| # all outputs should be NaN |
| assert np.all(np.isnan(b2.numpy())) |
| if name == "exp": |
| n = 8 |
| out_np = np.random.randint(-20, 20, size=n).astype(A.dtype) |
| a2 = tvm.runtime.tensor(out_np, dev) |
| b2 = tvm.runtime.tensor(np.empty_like(out_np), dev) |
| func(a2, b2) |
| assert b2.numpy().dtype == np.float32 |
| # Verify correctness against NumPy exp |
| expected = np.exp(out_np.astype(np.float32)) |
| tvm.testing.assert_allclose(b2.numpy(), expected, rtol=1e-5, atol=1e-5) |
| |
| for func in test_funcs: |
| atol = rtol = 1e-3 if func[0].__name__ in ["asin", "acos", "atan"] else 1e-5 |
| run_test(*func, atol, rtol) |
| |
| |
| def test_asin_acos_boundary_values(): |
| """Test asin and acos with boundary values and threshold switching.""" |
| test_funcs = [ |
| (tvm.tirx.asin, lambda x: np.arcsin(x)), |
| (tvm.tirx.acos, lambda x: np.arccos(x)), |
| ] |
| |
| def run_test(tvm_intrin, np_func): |
| m = te.var("m") |
| A = te.placeholder((m,), name="A") |
| B = te.compute((m,), lambda *i: tvm_intrin(A(*i)), name="B") |
| |
| mod = te.create_prim_func([A, B]) |
| sch = tvm.s_tir.Schedule(mod) |
| func = tvm.compile(sch.mod, target="llvm") |
| |
| dev = tvm.cpu(0) |
| |
| # Test boundary values: ±1.0 (should use system library) |
| boundary_values = np.array([1.0, -1.0], dtype=np.float32) |
| a1 = tvm.runtime.tensor(boundary_values, dev) |
| b1 = tvm.runtime.tensor(np.empty_like(boundary_values), dev) |
| func(a1, b1) |
| tvm.testing.assert_allclose(b1.numpy(), np_func(boundary_values), atol=1e-5, rtol=1e-5) |
| |
| # Test values at threshold: ±0.5 (should use system library) |
| threshold_values = np.array([0.5, -0.5], dtype=np.float32) |
| a2 = tvm.runtime.tensor(threshold_values, dev) |
| b2 = tvm.runtime.tensor(np.empty_like(threshold_values), dev) |
| func(a2, b2) |
| tvm.testing.assert_allclose(b2.numpy(), np_func(threshold_values), atol=1e-4, rtol=1e-4) |
| |
| # Test values just below threshold: ±0.49 (should use Taylor series) |
| below_threshold_values = np.array([0.49, -0.49, 0.3, -0.3, 0.0], dtype=np.float32) |
| a3 = tvm.runtime.tensor(below_threshold_values, dev) |
| b3 = tvm.runtime.tensor(np.empty_like(below_threshold_values), dev) |
| func(a3, b3) |
| tvm.testing.assert_allclose( |
| b3.numpy(), np_func(below_threshold_values), atol=1e-3, rtol=1e-3 |
| ) |
| |
| # Test out-of-domain values: should return NaN |
| out_of_domain = np.array([1.1, -1.1, 2.0, -2.0], dtype=np.float32) |
| a4 = tvm.runtime.tensor(out_of_domain, dev) |
| b4 = tvm.runtime.tensor(np.empty_like(out_of_domain), dev) |
| func(a4, b4) |
| assert np.all(np.isnan(b4.numpy())), "Out-of-domain inputs should return NaN" |
| |
| for func in test_funcs: |
| run_test(*func) |
| |
| |
| def test_binary_intrin(): |
| test_funcs = [ |
| (tvm.tirx.atan2, lambda x1, x2: np.arctan2(x1, x2)), |
| (tvm.tirx.nextafter, lambda x1, x2: np.nextafter(x1, x2)), |
| (tvm.tirx.copysign, lambda x1, x2: np.copysign(x1, x2)), |
| (tvm.tirx.hypot, lambda x1, x2: np.hypot(x1, x2)), |
| ] |
| |
| def run_test(tvm_intrin, np_func): |
| m = te.var( |
| "m", |
| ) |
| A = te.placeholder((m,), name="A") |
| B = te.placeholder((m,), name="B") |
| C = te.compute((m,), lambda *i: tvm_intrin(A(*i), B(*i)), name="C") |
| |
| # Convert to TIR and create schedule |
| mod = te.create_prim_func([A, B, C]) |
| sch = tvm.s_tir.Schedule(mod) |
| |
| # Build from scheduled TIR |
| func = tvm.compile(sch.mod, target="llvm") |
| |
| dev = tvm.cpu(0) |
| n = 10 |
| a = tvm.runtime.tensor(np.random.uniform(0, 1, size=n).astype(A.dtype), dev) |
| b = tvm.runtime.tensor(np.random.uniform(0, 1, size=n).astype(B.dtype), dev) |
| c = tvm.runtime.tensor(np.random.uniform(size=n).astype(A.dtype), dev) |
| func(a, b, c) |
| tvm.testing.assert_allclose(c.numpy(), np_func(a.numpy(), b.numpy()), atol=1e-5, rtol=1e-5) |
| |
| for func in test_funcs: |
| run_test(*func) |
| |
| |
| def test_ldexp(): |
| m = te.var( |
| "m", |
| ) |
| A = te.placeholder((m,), name="A") |
| B = te.placeholder((m,), name="B", dtype="int32") |
| C = te.compute((m,), lambda *i: tvm.tirx.ldexp(A(*i), B(*i)), name="C") |
| |
| # Convert to TIR and create schedule |
| mod = te.create_prim_func([A, B, C]) |
| sch = tvm.s_tir.Schedule(mod) |
| |
| # Build from scheduled TIR |
| func = tvm.compile(sch.mod, target="llvm") |
| |
| dev = tvm.cpu(0) |
| n = 10 |
| a = tvm.runtime.tensor(np.random.uniform(0, 1, size=n).astype(A.dtype), dev) |
| b = tvm.runtime.tensor(np.random.randint(0, 5, size=n).astype(B.dtype), dev) |
| c = tvm.runtime.tensor(np.random.uniform(size=n).astype(A.dtype), dev) |
| func(a, b, c) |
| tvm.testing.assert_allclose(c.numpy(), np.ldexp(a.numpy(), b.numpy()), atol=1e-5, rtol=1e-5) |
| |
| |
| dtype = tvm.testing.parameter("int32", "int64") |
| |
| |
| @tvm.testing.parametrize_targets("llvm", {"kind": "vulkan", "from_device": 0}) |
| def test_clz(target, dev, dtype): |
| target = tvm.target.Target(target) |
| if ( |
| target.kind.name == "vulkan" |
| and dtype == "int64" |
| and not target.attrs.get("supports_int64", False) |
| ): |
| pytest.xfail("Vulkan target does not support Int64 types") |
| |
| def clz_np(x, dtype): |
| ceil_log2 = np.ceil(np.log2(x)).astype(dtype) |
| bits = int(dtype[-2:]) |
| clz = bits - ceil_log2 |
| clz[np.bitwise_and(x, x - 1) == 0] -= 1 |
| return clz |
| |
| m = te.var("m") |
| A = te.placeholder((m,), name="A", dtype=dtype) |
| B = te.compute((m,), lambda *i: tvm.tirx.clz(A(*i)), name="B") |
| |
| # Convert to TIR and create schedule |
| mod = te.create_prim_func([A, B]) |
| sch = tvm.s_tir.Schedule(mod) |
| |
| # Apply scheduling primitives if target is Vulkan |
| if target.kind.name == "vulkan": |
| block = sch.get_sblock("B") |
| loop = sch.get_loops(block)[0] |
| bx, tx = sch.split(loop, factors=[None, 64]) |
| sch.bind(bx, "blockIdx.x") |
| sch.bind(tx, "threadIdx.x") |
| |
| # Build from scheduled TIR |
| func = tvm.compile(sch.mod, target=target) |
| |
| n = 10 |
| highs = [10, 100, 1000, 10000, 100000, 1000000] |
| |
| if dtype == "int64": |
| highs.append((1 << 63) - 1) |
| |
| for high in highs: |
| a_np = np.random.randint(1, high=high, size=(n,), dtype=dtype) |
| a = tvm.runtime.tensor(a_np, dev) |
| b = tvm.runtime.tensor(np.zeros((n,)).astype("int32"), dev) |
| func(a, b) |
| ref = clz_np(a_np, dtype) |
| np.testing.assert_equal(b.numpy(), ref) |
| |
| |
| @tvm.script.ir_module |
| class Module: |
| @T.prim_func |
| def test_tir_fma(A: T.handle, B: T.handle, C: T.handle, d: T.handle) -> None: |
| # function attr dict |
| T.func_attr({"global_symbol": "test_fma", "tirx.noalias": True}) |
| n = T.int32() |
| stride = T.int32() |
| stride_1 = T.int32() |
| stride_2 = T.int32() |
| stride_3 = T.int32() |
| A_1 = T.match_buffer( |
| A, |
| [n], |
| strides=[stride], |
| elem_offset=0, |
| align=64, |
| offset_factor=1, |
| buffer_type="auto", |
| ) |
| B_1 = T.match_buffer( |
| B, |
| [n], |
| strides=[stride_1], |
| elem_offset=0, |
| align=64, |
| offset_factor=1, |
| buffer_type="auto", |
| ) |
| C_1 = T.match_buffer( |
| C, |
| [n], |
| strides=[stride_2], |
| elem_offset=0, |
| align=64, |
| offset_factor=1, |
| buffer_type="auto", |
| ) |
| d_1 = T.match_buffer( |
| d, |
| [n], |
| strides=[stride_3], |
| elem_offset=0, |
| align=64, |
| offset_factor=1, |
| buffer_type="auto", |
| ) |
| # body |
| for i in T.serial(0, n): |
| d_1[(i * stride_3)] = (A_1[(i * stride)] * B_1[(i * stride_1)]) + C_1[(i * stride_2)] |
| |
| |
| def test_fma(): |
| opt = tvm.transform.Sequential( |
| [ |
| tvm.tirx.transform.Apply(lambda f: f.with_attr("target", tvm.target.Target("llvm"))), |
| tvm.tirx.transform.LowerIntrin(), |
| ] |
| ) |
| mod = opt(Module) |
| assert mod["test_tir_fma"].body.body.value.op.name == "tirx.call_llvm_pure_intrin" |
| |
| |
| if __name__ == "__main__": |
| test_nearbyint() |
| test_unary_intrin() |
| test_round_intrinsics_on_int() |
| test_asin_acos_boundary_values() |
| test_binary_intrin() |
| test_ldexp() |
| test_clz() |
| test_fma() |