blob: 30676715b8992ff36dd9e37c10b56b8258783e8d [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ruff: noqa: E712, F401, F821
import ctypes
import math
import numpy as np
import scipy
import tvm
import tvm.testing
from tvm import te, tirx, topi
from tvm.contrib import clang, utils
from tvm.script import tirx as T
def test_nearbyint():
m = te.var(
"m",
)
A = te.placeholder((m,), name="A")
A_rounded = te.compute((m,), lambda *i: tvm.tirx.nearbyint(A(*i)), name="A")
# Convert to TIR and create schedule
mod = te.create_prim_func([A, A_rounded])
sch = tvm.s_tir.Schedule(mod)
# Build from scheduled TIR
func = tvm.compile(sch.mod, target="llvm")
dev = tvm.cpu(0)
n = 10
a = tvm.runtime.tensor(np.random.uniform(high=100, size=n).astype(A.dtype), dev)
a_rounded = tvm.runtime.tensor(np.random.uniform(size=n).astype(A_rounded.dtype), dev)
func(a, a_rounded)
# Note that numpys rint rounds to nearest integer with
# ties to halfway is broken by rounding to even.
# So that 1.5 and 2.5 will round 2.
# This is the default rounding mode with libc as well.
# However one can set a different rounding mode and in that
# case numpy result might differ.
tvm.testing.assert_allclose(a_rounded.numpy(), np.rint(a.numpy()))
def test_round_ties_to_even():
"""Test that tir.round uses ties-to-even (banker's rounding) semantics."""
m = te.var("m")
A = te.placeholder((m,), name="A")
A_rounded = te.compute((m,), lambda *i: tvm.tirx.round(A(*i)), name="A")
mod = te.create_prim_func([A, A_rounded])
sch = tvm.s_tir.Schedule(mod)
func = tvm.compile(sch.mod, target="llvm")
dev = tvm.cpu(0)
# Midpoint values where ties-to-even and ties-away differ
test_values = np.array([0.5, 1.5, 2.5, 3.5, -0.5, -1.5, -2.5, -3.5], dtype="float32")
expected = np.array([0.0, 2.0, 2.0, 4.0, 0.0, -2.0, -2.0, -4.0], dtype="float32")
a = tvm.runtime.tensor(test_values, dev)
a_rounded = tvm.runtime.tensor(np.zeros(len(test_values), dtype="float32"), dev)
func(a, a_rounded)
tvm.testing.assert_allclose(a_rounded.numpy(), expected)
def test_round_intrinsics_on_int():
i = tvm.tirx.Var("i", "int32")
for op in [tvm.tirx.round, tvm.tirx.trunc, tvm.tirx.ceil, tvm.tirx.floor, tvm.tirx.nearbyint]:
assert op(tvm.tirx.const(10, "int32")).value == 10
assert op(tvm.tirx.const(True, "bool")).value == True
assert op(i).same_as(i)
assert tvm.tirx.isnan(tvm.tirx.const(10, "int32")).value == False
def test_unary_intrin():
test_funcs = [
(tvm.tirx.exp, lambda x: np.exp(x)),
(tvm.tirx.exp10, lambda x: np.power(10, x)),
(tvm.tirx.log2, lambda x: np.log2(x)),
(tvm.tirx.log10, lambda x: np.log10(x)),
(tvm.tirx.sinh, lambda x: np.sinh(x)),
(tvm.tirx.cosh, lambda x: np.cosh(x)),
(tvm.tirx.log1p, lambda x: np.log1p(x)),
(tvm.tirx.asin, lambda x: np.arcsin(x)),
(tvm.tirx.acos, lambda x: np.arccos(x)),
(tvm.tirx.atan, lambda x: np.arctan(x)),
(tvm.tirx.asinh, lambda x: np.arcsinh(x)),
(tvm.tirx.acosh, lambda x: np.arccosh(x)),
(tvm.tirx.atanh, lambda x: np.arctanh(x)),
(tvm.tirx.erf, lambda x: scipy.special.erf(x)),
]
def run_test(tvm_intrin, np_func, atol=1e-5, rtol=1e-5):
m = te.var(
"m",
)
A = te.placeholder((m,), name="A")
B = te.compute((m,), lambda *i: tvm_intrin(A(*i)), name="B")
# Convert to TIR and create schedule
mod = te.create_prim_func([A, B])
sch = tvm.s_tir.Schedule(mod)
# Build from scheduled TIR
func = tvm.compile(sch.mod, target="llvm")
dev = tvm.cpu(0)
n = 10
a = tvm.runtime.tensor(np.random.uniform(0.1, 0.5, size=n).astype(A.dtype), dev)
b = tvm.runtime.tensor(np.random.uniform(size=n).astype(A.dtype), dev)
func(a, b)
tvm.testing.assert_allclose(b.numpy(), np_func(a.numpy()), atol=atol, rtol=rtol)
# Out-of-bounds test for asin/acos
name = tvm_intrin.__name__
if name in ("asin", "acos"):
# generate some values outside [-1, 1]
n = 8
out_np = np.concatenate(
[
np.random.uniform(1.1, 2.0, size=n // 2),
np.random.uniform(-2.0, -1.1, size=n // 2),
]
).astype(A.dtype)
a2 = tvm.runtime.tensor(out_np, dev)
b2 = tvm.runtime.tensor(np.empty_like(out_np), dev)
func(a2, b2)
# all outputs should be NaN
assert np.all(np.isnan(b2.numpy()))
if name == "exp":
n = 8
out_np = np.random.randint(-20, 20, size=n).astype(A.dtype)
a2 = tvm.runtime.tensor(out_np, dev)
b2 = tvm.runtime.tensor(np.empty_like(out_np), dev)
func(a2, b2)
assert b2.numpy().dtype == np.float32
# Verify correctness against NumPy exp
expected = np.exp(out_np.astype(np.float32))
tvm.testing.assert_allclose(b2.numpy(), expected, rtol=1e-5, atol=1e-5)
for func in test_funcs:
atol = rtol = 1e-3 if func[0].__name__ in ["asin", "acos", "atan"] else 1e-5
run_test(*func, atol, rtol)
def test_asin_acos_boundary_values():
"""Test asin and acos with boundary values and threshold switching."""
test_funcs = [
(tvm.tirx.asin, lambda x: np.arcsin(x)),
(tvm.tirx.acos, lambda x: np.arccos(x)),
]
def run_test(tvm_intrin, np_func):
m = te.var("m")
A = te.placeholder((m,), name="A")
B = te.compute((m,), lambda *i: tvm_intrin(A(*i)), name="B")
mod = te.create_prim_func([A, B])
sch = tvm.s_tir.Schedule(mod)
func = tvm.compile(sch.mod, target="llvm")
dev = tvm.cpu(0)
# Test boundary values: ±1.0 (should use system library)
boundary_values = np.array([1.0, -1.0], dtype=np.float32)
a1 = tvm.runtime.tensor(boundary_values, dev)
b1 = tvm.runtime.tensor(np.empty_like(boundary_values), dev)
func(a1, b1)
tvm.testing.assert_allclose(b1.numpy(), np_func(boundary_values), atol=1e-5, rtol=1e-5)
# Test values at threshold: ±0.5 (should use system library)
threshold_values = np.array([0.5, -0.5], dtype=np.float32)
a2 = tvm.runtime.tensor(threshold_values, dev)
b2 = tvm.runtime.tensor(np.empty_like(threshold_values), dev)
func(a2, b2)
tvm.testing.assert_allclose(b2.numpy(), np_func(threshold_values), atol=1e-4, rtol=1e-4)
# Test values just below threshold: ±0.49 (should use Taylor series)
below_threshold_values = np.array([0.49, -0.49, 0.3, -0.3, 0.0], dtype=np.float32)
a3 = tvm.runtime.tensor(below_threshold_values, dev)
b3 = tvm.runtime.tensor(np.empty_like(below_threshold_values), dev)
func(a3, b3)
tvm.testing.assert_allclose(
b3.numpy(), np_func(below_threshold_values), atol=1e-3, rtol=1e-3
)
# Test out-of-domain values: should return NaN
out_of_domain = np.array([1.1, -1.1, 2.0, -2.0], dtype=np.float32)
a4 = tvm.runtime.tensor(out_of_domain, dev)
b4 = tvm.runtime.tensor(np.empty_like(out_of_domain), dev)
func(a4, b4)
assert np.all(np.isnan(b4.numpy())), "Out-of-domain inputs should return NaN"
for func in test_funcs:
run_test(*func)
def test_binary_intrin():
test_funcs = [
(tvm.tirx.atan2, lambda x1, x2: np.arctan2(x1, x2)),
(tvm.tirx.nextafter, lambda x1, x2: np.nextafter(x1, x2)),
(tvm.tirx.copysign, lambda x1, x2: np.copysign(x1, x2)),
(tvm.tirx.hypot, lambda x1, x2: np.hypot(x1, x2)),
]
def run_test(tvm_intrin, np_func):
m = te.var(
"m",
)
A = te.placeholder((m,), name="A")
B = te.placeholder((m,), name="B")
C = te.compute((m,), lambda *i: tvm_intrin(A(*i), B(*i)), name="C")
# Convert to TIR and create schedule
mod = te.create_prim_func([A, B, C])
sch = tvm.s_tir.Schedule(mod)
# Build from scheduled TIR
func = tvm.compile(sch.mod, target="llvm")
dev = tvm.cpu(0)
n = 10
a = tvm.runtime.tensor(np.random.uniform(0, 1, size=n).astype(A.dtype), dev)
b = tvm.runtime.tensor(np.random.uniform(0, 1, size=n).astype(B.dtype), dev)
c = tvm.runtime.tensor(np.random.uniform(size=n).astype(A.dtype), dev)
func(a, b, c)
tvm.testing.assert_allclose(c.numpy(), np_func(a.numpy(), b.numpy()), atol=1e-5, rtol=1e-5)
for func in test_funcs:
run_test(*func)
def test_ldexp():
m = te.var(
"m",
)
A = te.placeholder((m,), name="A")
B = te.placeholder((m,), name="B", dtype="int32")
C = te.compute((m,), lambda *i: tvm.tirx.ldexp(A(*i), B(*i)), name="C")
# Convert to TIR and create schedule
mod = te.create_prim_func([A, B, C])
sch = tvm.s_tir.Schedule(mod)
# Build from scheduled TIR
func = tvm.compile(sch.mod, target="llvm")
dev = tvm.cpu(0)
n = 10
a = tvm.runtime.tensor(np.random.uniform(0, 1, size=n).astype(A.dtype), dev)
b = tvm.runtime.tensor(np.random.randint(0, 5, size=n).astype(B.dtype), dev)
c = tvm.runtime.tensor(np.random.uniform(size=n).astype(A.dtype), dev)
func(a, b, c)
tvm.testing.assert_allclose(c.numpy(), np.ldexp(a.numpy(), b.numpy()), atol=1e-5, rtol=1e-5)
dtype = tvm.testing.parameter("int32", "int64")
@tvm.testing.parametrize_targets("llvm", {"kind": "vulkan", "from_device": 0})
def test_clz(target, dev, dtype):
target = tvm.target.Target(target)
if (
target.kind.name == "vulkan"
and dtype == "int64"
and not target.attrs.get("supports_int64", False)
):
pytest.xfail("Vulkan target does not support Int64 types")
def clz_np(x, dtype):
ceil_log2 = np.ceil(np.log2(x)).astype(dtype)
bits = int(dtype[-2:])
clz = bits - ceil_log2
clz[np.bitwise_and(x, x - 1) == 0] -= 1
return clz
m = te.var("m")
A = te.placeholder((m,), name="A", dtype=dtype)
B = te.compute((m,), lambda *i: tvm.tirx.clz(A(*i)), name="B")
# Convert to TIR and create schedule
mod = te.create_prim_func([A, B])
sch = tvm.s_tir.Schedule(mod)
# Apply scheduling primitives if target is Vulkan
if target.kind.name == "vulkan":
block = sch.get_sblock("B")
loop = sch.get_loops(block)[0]
bx, tx = sch.split(loop, factors=[None, 64])
sch.bind(bx, "blockIdx.x")
sch.bind(tx, "threadIdx.x")
# Build from scheduled TIR
func = tvm.compile(sch.mod, target=target)
n = 10
highs = [10, 100, 1000, 10000, 100000, 1000000]
if dtype == "int64":
highs.append((1 << 63) - 1)
for high in highs:
a_np = np.random.randint(1, high=high, size=(n,), dtype=dtype)
a = tvm.runtime.tensor(a_np, dev)
b = tvm.runtime.tensor(np.zeros((n,)).astype("int32"), dev)
func(a, b)
ref = clz_np(a_np, dtype)
np.testing.assert_equal(b.numpy(), ref)
@tvm.script.ir_module
class Module:
@T.prim_func
def test_tir_fma(A: T.handle, B: T.handle, C: T.handle, d: T.handle) -> None:
# function attr dict
T.func_attr({"global_symbol": "test_fma", "tirx.noalias": True})
n = T.int32()
stride = T.int32()
stride_1 = T.int32()
stride_2 = T.int32()
stride_3 = T.int32()
A_1 = T.match_buffer(
A,
[n],
strides=[stride],
elem_offset=0,
align=64,
offset_factor=1,
buffer_type="auto",
)
B_1 = T.match_buffer(
B,
[n],
strides=[stride_1],
elem_offset=0,
align=64,
offset_factor=1,
buffer_type="auto",
)
C_1 = T.match_buffer(
C,
[n],
strides=[stride_2],
elem_offset=0,
align=64,
offset_factor=1,
buffer_type="auto",
)
d_1 = T.match_buffer(
d,
[n],
strides=[stride_3],
elem_offset=0,
align=64,
offset_factor=1,
buffer_type="auto",
)
# body
for i in T.serial(0, n):
d_1[(i * stride_3)] = (A_1[(i * stride)] * B_1[(i * stride_1)]) + C_1[(i * stride_2)]
def test_fma():
opt = tvm.transform.Sequential(
[
tvm.tirx.transform.Apply(lambda f: f.with_attr("target", tvm.target.Target("llvm"))),
tvm.tirx.transform.LowerIntrin(),
]
)
mod = opt(Module)
assert mod["test_tir_fma"].body.body.value.op.name == "tirx.call_llvm_pure_intrin"
if __name__ == "__main__":
test_nearbyint()
test_unary_intrin()
test_round_intrinsics_on_int()
test_asin_acos_boundary_values()
test_binary_intrin()
test_ldexp()
test_clz()
test_fma()