blob: 3e0fe7e31e50133d87457097385c476f9fa7b9a6 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import re
import tvm
import tvm.testing
from tvm import te
target = "opencl"
@tvm.testing.requires_gpu
@tvm.testing.requires_opencl
def test_opencl_ternary_expression():
def check_if_then_else(dev, n, dtype):
A = te.placeholder((n,), name="A", dtype=dtype)
true_value = tvm.tir.const(1, dtype=dtype)
false_value = tvm.tir.const(3, dtype=dtype)
max_lhs = tvm.tir.const(2, dtype=dtype)
max_rhs = tvm.tir.if_then_else(A[0] > 0, true_value, false_value)
C = te.compute((n,), lambda i: tvm.te.max(max_lhs, max_rhs), name="C")
func = te.create_prim_func([A, C])
sch = tvm.tir.Schedule(func)
(x,) = sch.get_loops(sch.get_block("C"))
sch.bind(x, "threadIdx.x")
fun = tvm.tir.build(sch.mod, target=target)
a = tvm.runtime.empty((n,), A.dtype, dev)
c = tvm.runtime.empty((n,), A.dtype, dev)
# Only need to test compiling here
fun(a, c)
def check_select(dev, n, dtype):
A = te.placeholder((n,), name="A", dtype=dtype)
true_value = tvm.tir.const(1, dtype=dtype)
false_value = tvm.tir.const(3, dtype=dtype)
max_lhs = tvm.tir.const(2, dtype=dtype)
max_rhs = tvm.tir.Select(A[0] > 0, true_value, false_value)
C = te.compute((n,), lambda i: tvm.te.max(max_lhs, max_rhs), name="C")
func = te.create_prim_func([A, C])
sch = tvm.tir.Schedule(func)
(x,) = sch.get_loops(sch.get_block("C"))
sch.bind(x, "threadIdx.x")
fun = tvm.tir.build(sch.mod, target=target)
a = tvm.runtime.empty((n,), A.dtype, dev)
c = tvm.runtime.empty((n,), A.dtype, dev)
# Only need to test compiling here
fun(a, c)
dev = tvm.device(target, 0)
check_if_then_else(dev, 1, "int8")
check_if_then_else(dev, 1, "uint8")
check_if_then_else(dev, 1, "int16")
check_if_then_else(dev, 1, "uint16")
check_select(dev, 1, "int8")
check_select(dev, 1, "uint8")
check_select(dev, 1, "int16")
check_select(dev, 1, "uint16")
@tvm.testing.requires_gpu
@tvm.testing.requires_opencl
def test_opencl_inf_nan():
def check_inf_nan(dev, n, value, dtype):
A = te.placeholder((n,), name="A", dtype=dtype)
inf_value = tvm.tir.const(value, dtype=dtype)
C = te.compute((n,), lambda i: inf_value, name="C")
func = te.create_prim_func([A, C])
sch = tvm.tir.Schedule(func)
(x,) = sch.get_loops(sch.get_block("C"))
sch.bind(x, "threadIdx.x")
fun = tvm.tir.build(sch.mod, target=target)
a = tvm.runtime.empty((n,), A.dtype, dev)
c = tvm.runtime.empty((n,), A.dtype, dev)
# Only need to test compiling here
fun(a, c)
dev = tvm.device(target, 0)
check_inf_nan(dev, 1, -float("inf"), "float32")
check_inf_nan(dev, 1, -float("inf"), "float64")
check_inf_nan(dev, 1, float("inf"), "float32")
check_inf_nan(dev, 1, float("inf"), "float64")
check_inf_nan(dev, 1, float("nan"), "float32")
check_inf_nan(dev, 1, float("nan"), "float64")
@tvm.testing.requires_gpu
@tvm.testing.requires_opencl
def test_opencl_max():
def check_max(dev, n, dtype):
A = te.placeholder((n,), name="A", dtype=dtype)
max_lhs = A[0] + tvm.tir.const(1, dtype=dtype)
max_rhs = tvm.tir.const(0, dtype=dtype)
C = te.compute((n,), lambda i: tvm.te.max(max_lhs, max_rhs), name="C")
func = te.create_prim_func([A, C])
sch = tvm.tir.Schedule(func)
(x,) = sch.get_loops(sch.get_block("C"))
sch.bind(x, "threadIdx.x")
fun = tvm.tir.build(sch.mod, target=target)
a = tvm.runtime.empty((n,), A.dtype, dev)
c = tvm.runtime.empty((n,), A.dtype, dev)
# Only need to test compiling here
fun(a, c)
dev = tvm.device(target, 0)
check_max(dev, 1, "int8")
check_max(dev, 1, "uint8")
check_max(dev, 1, "int16")
check_max(dev, 1, "uint16")
check_max(dev, 1, "float32")
check_max(dev, 1, "float64")
def test_opencl_erf():
def check_erf(dev, n, dtype):
A = te.placeholder((n,), name="A", dtype=dtype)
C = te.compute(A.shape, lambda *i: te.erf(A(*i)), name="C")
func = te.create_prim_func([A, C])
sch = tvm.tir.Schedule(func)
(x,) = sch.get_loops(sch.get_block("C"))
sch.bind(x, "threadIdx.x")
fun = tvm.tir.build(sch.mod, target=target)
source_str = fun.imports[0].inspect_source()
matches = re.findall("erf", source_str)
error_matches = re.findall("erff", source_str)
assert len(matches) == 1 and len(error_matches) == 0
dev = tvm.device(target, 0)
check_erf(dev, 1, "float32")
check_erf(dev, 1, "float64")
@tvm.testing.requires_gpu
@tvm.testing.requires_opencl
def test_opencl_type_casting():
def check_type_casting(ctx, n, dtype):
block_size = 4
C = te.compute(
(n,),
lambda i: tvm.tir.Select(
tvm.tir.all(
*[
i // block_size == tvm.tir.const(3, "int32"),
i % 3 == tvm.tir.const(1, "int32"),
]
),
tvm.tir.const(1, dtype),
tvm.tir.const(0, dtype),
),
name="C",
)
# NOTE: test simple convert pattern
func = te.create_prim_func([C])
sch = tvm.tir.Schedule(func)
(x,) = sch.get_loops(sch.get_block("C"))
tx, vx = sch.split(x, factors=[None, block_size])
sch.bind(tx, "threadIdx.x")
sch.vectorize(vx)
fun = tvm.tir.build(sch.mod, target=target)
c = tvm.runtime.empty((n,), dtype, ctx)
assembly = fun.imports[0].inspect_source()
lcond = "convert_int4(((convert_uint4(((uint4)(((convert_int(get_local_id(0))) == 3), ((convert_int(get_local_id(0))) == 3), ((convert_int(get_local_id(0))) == 3), ((convert_int(get_local_id(0))) == 3)))))"
rcond = "(convert_uint4(((((int4)(((convert_int(get_local_id(0))))+(1*0), ((convert_int(get_local_id(0))))+(1*1), ((convert_int(get_local_id(0))))+(1*2), ((convert_int(get_local_id(0))))+(1*3))) % ((int4)(3, 3, 3, 3))) == ((int4)(1, 1, 1, 1))))))))"
pattern_cond = "({} && {})".format(lcond, rcond)
assert assembly.count(pattern_cond) != 0
fun(c)
dev = tvm.device(target, 0)
check_type_casting(dev, 32, "float32")
# fp16 is not yet supported in ci
# check_type_casting(dev, 16, "float16")
@tvm.testing.requires_gpu
@tvm.testing.requires_opencl
@tvm.testing.parametrize_targets("opencl", "opencl -device=adreno")
def test_opencl_ceil_log2(target):
def _check(target, n, dtype):
with tvm.target.Target(target):
C = te.compute(
(n,),
lambda i: tvm.topi.ceil_log2(i),
name="C",
)
func = te.create_prim_func([C])
sch = tvm.tir.Schedule(func)
(x,) = sch.get_loops(sch.get_block("C"))
sch.bind(x, "threadIdx.x")
fun = tvm.tir.build(sch.mod, target=target)
assembly = fun.imports[0].inspect_source()
if "adreno" in target:
pattern = "convert_float"
else:
pattern = "convert_double"
assert assembly.count(pattern) != 0
_check(target, 32, "float32")
def _get_maximum_kernel_args(source):
def get_kernel_args(source):
import re
p = re.tir.build(r"__kernel void .+\((.*)\)")
args = p.findall(source)
return args
args = get_kernel_args(source)
max_args = len(args[0].split(","))
for arg_line in args:
max_args = max(max_args, len(arg_line.split(",")))
return max_args
if __name__ == "__main__":
tvm.testing.main()