blob: 2e0fd0fd31f6ab53d7ece7f1eb1782b6f70e5822 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import re
import tvm
import tvm.testing
from tvm.script import tir as T, ir as I
target = "opencl"
@tvm.testing.requires_gpu
@tvm.testing.requires_opencl
def test_opencl_ternary_expression():
def check_if_then_else(dev, n, dtype):
@I.ir_module
class Module:
@T.prim_func
def main(A: T.Buffer((1,), dtype), C: T.Buffer((1,), dtype)):
T.func_attr({"tir.noalias": True})
for i in T.thread_binding(1, thread="threadIdx.x"):
with T.sblock("C"):
v_i = T.axis.spatial(1, i)
T.reads(A[0])
T.writes(C[v_i])
C[v_i] = T.max(
T.Cast(dtype, 2),
T.if_then_else(
0 < T.Cast("int32", A[0]),
T.Cast(dtype, 1),
T.Cast(dtype, 3),
),
)
fun = tvm.tir.build(Module, target=target)
a = tvm.runtime.empty((n,), dtype, dev)
c = tvm.runtime.empty((n,), dtype, dev)
# Only need to test compiling here
fun(a, c)
def check_select(dev, n, dtype):
@I.ir_module
class Module:
@T.prim_func
def main(A: T.Buffer((1,), dtype), C: T.Buffer((1,), dtype)):
T.func_attr({"tir.noalias": True})
for i in T.thread_binding(1, thread="threadIdx.x"):
with T.sblock("C"):
v_i = T.axis.spatial(1, i)
T.reads(A[0])
T.writes(C[v_i])
C[v_i] = T.max(
T.Cast(dtype, 2),
T.Select(
0 < T.Cast("int32", A[0]),
T.Cast(dtype, 1),
T.Cast(dtype, 3),
),
)
fun = tvm.tir.build(Module, target=target)
a = tvm.runtime.empty((n,), dtype, dev)
c = tvm.runtime.empty((n,), dtype, dev)
# Only need to test compiling here
fun(a, c)
dev = tvm.device(target, 0)
check_if_then_else(dev, 1, "int8")
check_if_then_else(dev, 1, "uint8")
check_if_then_else(dev, 1, "int16")
check_if_then_else(dev, 1, "uint16")
check_select(dev, 1, "int8")
check_select(dev, 1, "uint8")
check_select(dev, 1, "int16")
check_select(dev, 1, "uint16")
@tvm.testing.requires_gpu
@tvm.testing.requires_opencl
def test_opencl_inf_nan():
def check_inf_nan(dev, n, value, dtype):
@I.ir_module
class Module:
@T.prim_func
def main(A: T.Buffer((1,), dtype), C: T.Buffer((1,), dtype)):
T.func_attr({"tir.noalias": True})
for i in T.thread_binding(1, thread="threadIdx.x"):
with T.sblock("C"):
v_i = T.axis.spatial(1, i)
T.reads()
T.writes(C[v_i])
C[v_i] = T.Cast(dtype, value)
fun = tvm.tir.build(Module, target=target)
a = tvm.runtime.empty((n,), dtype, dev)
c = tvm.runtime.empty((n,), dtype, dev)
# Only need to test compiling here
fun(a, c)
dev = tvm.device(target, 0)
check_inf_nan(dev, 1, -float("inf"), "float32")
check_inf_nan(dev, 1, -float("inf"), "float64")
check_inf_nan(dev, 1, float("inf"), "float32")
check_inf_nan(dev, 1, float("inf"), "float64")
check_inf_nan(dev, 1, float("nan"), "float32")
check_inf_nan(dev, 1, float("nan"), "float64")
@tvm.testing.requires_gpu
@tvm.testing.requires_opencl
def test_opencl_max():
def check_max(dev, n, dtype):
@I.ir_module
class Module:
@T.prim_func
def main(A: T.Buffer((1,), dtype), C: T.Buffer((1,), dtype)):
T.func_attr({"tir.noalias": True})
for i in T.thread_binding(1, thread="threadIdx.x"):
with T.sblock("C"):
v_i = T.axis.spatial(1, i)
T.reads(A[0])
T.writes(C[v_i])
C[v_i] = T.max(A[0] + T.Cast(dtype, 1), T.Cast(dtype, 0))
fun = tvm.tir.build(Module, target=target)
a = tvm.runtime.empty((n,), dtype, dev)
c = tvm.runtime.empty((n,), dtype, dev)
# Only need to test compiling here
fun(a, c)
dev = tvm.device(target, 0)
check_max(dev, 1, "int8")
check_max(dev, 1, "uint8")
check_max(dev, 1, "int16")
check_max(dev, 1, "uint16")
check_max(dev, 1, "float32")
check_max(dev, 1, "float64")
def test_opencl_erf():
def check_erf(dev, n, dtype):
@I.ir_module
class Module:
@T.prim_func
def main(A: T.Buffer((1,), dtype), C: T.Buffer((1,), dtype)):
T.func_attr({"tir.noalias": True})
for i0 in T.thread_binding(1, thread="threadIdx.x"):
with T.sblock("C"):
v_i0 = T.axis.spatial(1, i0)
T.reads(A[v_i0])
T.writes(C[v_i0])
C[v_i0] = T.erf(A[v_i0])
fun = tvm.tir.build(Module, target=target)
source_str = fun.imports[0].inspect_source()
matches = re.findall("erf", source_str)
error_matches = re.findall("erff", source_str)
assert len(matches) == 1 and len(error_matches) == 0
dev = tvm.device(target, 0)
check_erf(dev, 1, "float32")
check_erf(dev, 1, "float64")
@tvm.testing.requires_gpu
@tvm.testing.requires_opencl
def test_opencl_type_casting():
@I.ir_module
class Module:
@T.prim_func
def main(C: T.Buffer((32,), "float32")):
T.func_attr({"tir.noalias": True})
for i_0 in T.thread_binding(8, thread="threadIdx.x"):
for i_1 in T.vectorized(4):
with T.sblock("C"):
v_i = T.axis.spatial(32, i_0 * 4 + i_1)
T.reads()
T.writes(C[v_i])
C[v_i] = T.Select(
v_i // 4 == 3 and v_i % 3 == 1, T.float32(1.0), T.float32(0.0)
)
def check_type_casting(ctx, n, dtype):
fun = tvm.tir.build(Module, target=target)
c = tvm.runtime.empty((n,), dtype, ctx)
assembly = fun.imports[0].inspect_source()
lcond = "convert_int4(((convert_uint4(((uint4)(((convert_int(get_local_id(0))) == 3), ((convert_int(get_local_id(0))) == 3), ((convert_int(get_local_id(0))) == 3), ((convert_int(get_local_id(0))) == 3)))))"
rcond = "(convert_uint4(((((int4)(((convert_int(get_local_id(0))))+(1*0), ((convert_int(get_local_id(0))))+(1*1), ((convert_int(get_local_id(0))))+(1*2), ((convert_int(get_local_id(0))))+(1*3))) % ((int4)(3, 3, 3, 3))) == ((int4)(1, 1, 1, 1))))))))"
pattern_cond = "({} && {})".format(lcond, rcond)
assert assembly.count(pattern_cond) != 0
fun(c)
dev = tvm.device(target, 0)
check_type_casting(dev, 32, "float32")
# fp16 is not yet supported in ci
# check_type_casting(dev, 16, "float16")
@tvm.testing.requires_gpu
@tvm.testing.requires_opencl
@tvm.testing.parametrize_targets("opencl", {"kind": "opencl", "device": "adreno"})
def test_opencl_ceil_log2(target):
def _check(target, n, dtype):
target_obj = tvm.target.Target(target)
is_adreno = "adreno" in target_obj.attrs.get("device", "")
inter_dtype = "float32" if is_adreno else "float64"
@I.ir_module
class Module:
@T.prim_func
def main(C: T.Buffer((n,), "int32")):
T.func_attr({"tir.noalias": True})
for i in T.thread_binding(n, thread="threadIdx.x"):
with T.sblock("C"):
v_i = T.axis.spatial(n, i)
T.reads()
T.writes(C[v_i])
C[v_i] = T.Cast("int32", T.ceil(T.log2(T.Cast(inter_dtype, v_i))))
fun = tvm.tir.build(Module, target=target)
assembly = fun.imports[0].inspect_source()
if is_adreno:
pattern = "convert_float"
else:
pattern = "convert_double"
assert assembly.count(pattern) != 0
_check(target, 32, "float32")
def _get_maximum_kernel_args(source):
def get_kernel_args(source):
import re
p = re.tir.build(r"__kernel void .+\((.*)\)")
args = p.findall(source)
return args
args = get_kernel_args(source)
max_args = len(args[0].split(","))
for arg_line in args:
max_args = max(max_args, len(arg_line.split(",")))
return max_args
if __name__ == "__main__":
tvm.testing.main()