blob: d1d2b9902c60c8e81035fc4980439e0e320b9c75 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test elementwise integration."""
import numpy as np
import tvm
import tvm.testing
from tvm import te
from tvm.contrib import nvcc
@tvm.testing.requires_gpu
def test_exp():
"""Test scheduling and running exponent."""
# graph
arr_length = 1024
arr_length_tvm = tvm.runtime.convert(arr_length)
placeholder_a = te.placeholder((arr_length_tvm,), name="A")
placeholder_b = te.compute(placeholder_a.shape, lambda *i: te.exp(placeholder_a(*i)), name="B")
schedule = te.create_schedule(placeholder_b.op)
# create iter var and assign them tags.
num_thread = 8
axis1, axis2 = schedule[placeholder_b].split(placeholder_b.op.axis[0], factor=num_thread)
schedule[placeholder_b].bind(axis1, te.thread_axis("blockIdx.x"))
schedule[placeholder_b].bind(axis2, te.thread_axis("threadIdx.x"))
# one line to build the function.
def check_device(device, host="stackvm"):
if not tvm.testing.device_enabled(host):
return
dev = tvm.device(device, 0)
if not tvm.testing.device_enabled(device):
print("skip because %s is not enabled.." % device)
return
fexp = tvm.build(schedule, [placeholder_a, placeholder_b], device, host, name="myexp")
dev = tvm.device(device, 0)
# launch the kernel.
buff_a = tvm.nd.array(np.random.uniform(size=arr_length).astype(placeholder_a.dtype), dev)
buff_b = tvm.nd.array(np.zeros(arr_length, dtype=placeholder_b.dtype), dev)
fexp(buff_a, buff_b)
tvm.testing.assert_allclose(buff_b.numpy(), np.exp(buff_a.numpy()), rtol=1e-5)
check_device("opencl -device=intel_graphics")
check_device("cuda", "llvm")
check_device("vulkan")
@tvm.testing.requires_gpu
def test_fmod():
"""Test scheduling and running fmod."""
# graph
def run(dtype):
size_var_n = te.size_var("n")
placeholder_a = te.placeholder((size_var_n,), name="A", dtype=dtype)
placeholder_b = te.placeholder((size_var_n,), name="B", dtype=dtype)
result_c = te.compute(
placeholder_a.shape, lambda *i: te.fmod(placeholder_a(*i), placeholder_b(*i)), name="C"
)
schedule = te.create_schedule(result_c.op)
# create iter var and assign them tags.
num_thread = 8
axis0, axis1 = schedule[result_c].split(result_c.op.axis[0], factor=num_thread)
def check_device(device):
dev = tvm.device(device, 0)
if not tvm.testing.device_enabled(device):
print("skip because %s is not enabled.." % device)
return
target = tvm.target.Target(device)
if "cpu" not in target.keys:
schedule[result_c].bind(axis0, te.thread_axis("blockIdx.x"))
schedule[result_c].bind(axis1, te.thread_axis("threadIdx.x"))
fmod = tvm.build(
schedule, [placeholder_a, placeholder_b, result_c], device, name="myfmod"
)
# launch the kernel.
value_n = 1024
a_np = (np.random.uniform(size=value_n) * 256).astype(placeholder_a.dtype)
b_np = (np.random.uniform(size=value_n) * 256).astype(placeholder_b.dtype)
# "fix" the values in a and b to avoid the result being too small
b_np += (b_np < 2.0) * 2
a_np[np.abs(np.fmod(a_np, b_np)) < 1] += 1
buff_a = tvm.nd.array(a_np, dev)
buff_b = tvm.nd.array(b_np, dev)
buff_c = tvm.nd.array(np.zeros(value_n, dtype=result_c.dtype), dev)
ftimer = fmod.time_evaluator(fmod.entry_name, dev, number=1)
_ = ftimer(buff_a, buff_b, buff_c).mean
np.testing.assert_allclose(
buff_c.numpy(), np.mod(buff_a.numpy(), buff_b.numpy()), rtol=1e-5
)
check_device("cuda")
check_device("opencl -device=intel_graphics")
check_device("metal")
run("float32")
@tvm.testing.requires_gpu
def test_multiple_cache_write():
"""Test multiple cache writes."""
# graph
arr_length = 1024
arr_length_tvm = tvm.runtime.convert(arr_length)
placeholder_a0 = te.placeholder((arr_length_tvm,), name="A0", dtype="float32")
placeholder_a1 = te.placeholder((arr_length_tvm,), name="A1", dtype="float32")
result_b0, result_b1 = te.compute(
(arr_length_tvm,),
lambda *i: (
placeholder_a0(*i) + placeholder_a1(*i),
placeholder_a0(*i) * placeholder_a1(*i),
),
name="B",
)
result_c = te.compute((arr_length_tvm,), lambda *i: result_b0(*i) + result_b1(*i), name="C")
schedule = te.create_schedule(result_c.op)
# create iter var and assign them tags.
num_thread = 8
cache_b0, _ = schedule.cache_write([result_b0, result_b1], "local")
axis0, axis1 = schedule[result_c].split(result_c.op.axis[0], factor=num_thread)
schedule[result_b0].compute_at(schedule[result_c], axis0)
schedule[cache_b0].compute_at(schedule[result_c], axis0)
schedule[result_c].bind(axis0, te.thread_axis("blockIdx.x"))
schedule[result_c].bind(axis1, te.thread_axis("threadIdx.x"))
# one line to build the function.
def check_device(device, host="stackvm"):
if not tvm.testing.device_enabled(host):
return
dev = tvm.device(device, 0)
if not tvm.testing.device_enabled(device):
return
func = tvm.build(
schedule,
[placeholder_a0, placeholder_a1, result_c],
device,
host,
name="multiple_cache_write",
)
dev = tvm.device(device, 0)
# launch the kernel.
buff_a0 = tvm.nd.array(np.random.uniform(size=arr_length).astype(placeholder_a0.dtype), dev)
buff_a1 = tvm.nd.array(np.random.uniform(size=arr_length).astype(placeholder_a1.dtype), dev)
buff_c = tvm.nd.array(np.zeros(arr_length, dtype=result_c.dtype), dev)
func(buff_a0, buff_a1, buff_c)
tvm.testing.assert_allclose(
buff_c.numpy(),
buff_a0.numpy() + buff_a1.numpy() + (buff_a0.numpy() * buff_a1.numpy()),
rtol=1e-5,
)
check_device("cuda", "llvm")
check_device("vulkan")
check_device("opencl")
def test_log_pow_llvm():
"""Test log pow using llvm to lower."""
# graph
size_var_n = te.size_var("n")
placeholder_a = te.placeholder((size_var_n,), name="A")
result_b = te.compute(
placeholder_a.shape, lambda *i: te.power(te.log(placeholder_a(*i)), 2.0), name="B"
)
schedule = te.create_schedule(result_b.op)
# create iter var and assign them tags.
schedule[result_b].split(result_b.op.axis[0], factor=32)
# one line to build the function.
if not tvm.testing.device_enabled("llvm"):
return
flog = tvm.build(schedule, [placeholder_a, result_b], "llvm", name="mylog")
dev = tvm.cpu(0)
# launch the kernel.
size_var_n = 1028
buff_a = tvm.nd.array(np.random.uniform(size=size_var_n).astype(placeholder_a.dtype), dev)
buff_b = tvm.nd.array(np.zeros(size_var_n, dtype=result_b.dtype), dev)
repeat = 10
ftimer = flog.time_evaluator(flog.entry_name, dev, number=1, repeat=repeat)
res = ftimer(buff_a, buff_b)
assert len(res.results) == repeat
tvm.testing.assert_allclose(buff_b.numpy(), np.power(np.log(buff_a.numpy()), 2.0), rtol=1e-5)
@tvm.testing.uses_gpu
def test_popcount():
"""Test popcount."""
def run(dtype):
# graph
arr_length = 1024
arr_length_tvm = tvm.runtime.convert(1024)
placeholder_a = te.placeholder((arr_length_tvm,), name="A", dtype=dtype)
placeholder_b = te.compute(
placeholder_a.shape, lambda *i: tvm.tir.popcount(placeholder_a(*i)), name="B"
)
schedule = te.create_schedule(placeholder_b.op)
# simple schedule
num_thread = 8
axis1, axis2 = schedule[placeholder_b].split(placeholder_b.op.axis[0], factor=num_thread)
def check_device(device):
dev = tvm.device(device, 0)
if not tvm.testing.device_enabled(device):
print("skip because %s is not enabled.." % device)
return
target = tvm.target.Target(device)
if "cpu" not in target.keys:
schedule[placeholder_b].bind(axis1, te.thread_axis("blockIdx.x"))
schedule[placeholder_b].bind(axis2, te.thread_axis("threadIdx.x"))
func = tvm.build(schedule, [placeholder_a, placeholder_b], device)
# launch the kernel.
buff_a = tvm.nd.array(
np.random.randint(low=0, high=1000, size=arr_length, dtype=placeholder_a.dtype), dev
)
buff_b = tvm.nd.array(np.zeros(shape=arr_length, dtype=placeholder_b.dtype), dev)
func(buff_a, buff_b)
tvm.testing.assert_allclose(
buff_b.numpy(), list(map(lambda x: bin(x).count("1"), buff_a.numpy())), rtol=1e-5
)
check_device("llvm")
check_device("cuda")
check_device("opencl")
if dtype == "uint32":
check_device("metal")
check_device("vulkan")
run("uint32")
run("uint64")
@tvm.testing.requires_gpu
def test_add():
"""Test addition."""
def run(dtype):
# graph
size_var_n = te.size_var("n")
placeholder_a = te.placeholder((size_var_n,), name="A", dtype=dtype)
placeholder_b = te.placeholder((size_var_n,), name="B", dtype=dtype)
result_c = te.compute(
placeholder_a.shape, lambda *i: placeholder_a(*i) + placeholder_b(*i), name="C"
)
# schedule
schedule = te.create_schedule(result_c.op)
# create iter var and assign them tags.
num_thread = 16
axis_bx, axis_x = schedule[result_c].split(result_c.op.axis[0], factor=num_thread * 4)
axis_tx, axis_x = schedule[result_c].split(axis_x, nparts=num_thread)
_, axis_x = schedule[result_c].split(axis_x, factor=4)
schedule[result_c].bind(axis_bx, te.thread_axis("blockIdx.x"))
schedule[result_c].bind(axis_tx, te.thread_axis("threadIdx.x"))
schedule[result_c].vectorize(axis_x)
# one line to build the function.
def check_device(device):
dev = tvm.device(device, 0)
if not tvm.testing.device_enabled(device):
print("skip because %s is not enabled.." % device)
return
fadd = tvm.build(
schedule, [placeholder_a, placeholder_b, result_c], device, name="myadd"
)
# launch the kernel.
n = 1024
buff_a = tvm.nd.array(
(np.random.uniform(size=n) * 256).astype(placeholder_a.dtype), dev
)
buff_b = tvm.nd.array(
(np.random.uniform(size=n) * 256).astype(placeholder_b.dtype), dev
)
buff_c = tvm.nd.array(np.zeros(n, dtype=result_c.dtype), dev)
ftimer = fadd.time_evaluator(fadd.entry_name, dev, number=1)
_ = ftimer(buff_a, buff_b, buff_c).mean
tvm.testing.assert_allclose(buff_c.numpy(), buff_a.numpy() + buff_b.numpy(), rtol=1e-6)
check_device("opencl")
check_device("cuda")
if dtype == "float32":
check_device("metal")
check_device("vulkan")
run("float32")
run("int32")
run("int64")
run("uint64")
@tvm.testing.requires_gpu
def try_warp_memory():
"""Test using warp memory
skip this in default test because it require higher arch"""
arr_size = 128
placeholder_a = te.placeholder((arr_size,), name="A")
result_b = te.compute((arr_size,), lambda i: placeholder_a[i] + 3, name="B")
warp_size = 32
schedule = te.create_schedule(result_b.op)
cache_read_aa = schedule.cache_read(placeholder_a, "warp", [result_b])
axis_x0, axis_xi = schedule[result_b].split(result_b.op.axis[0], warp_size * 2)
_, axis_xi1 = schedule[result_b].split(axis_xi, factor=warp_size)
thread_axis_tx = te.thread_axis("threadIdx.x")
schedule[result_b].bind(axis_xi1, thread_axis_tx)
schedule[result_b].bind(axis_x0, te.thread_axis("blockIdx.x"))
schedule[cache_read_aa].compute_at(schedule[result_b], axis_x0)
axis_x0, axis_xi = schedule[cache_read_aa].split(schedule[cache_read_aa].op.axis[0], warp_size)
schedule[cache_read_aa].bind(axis_xi, thread_axis_tx)
@tvm.register_func("tvm_callback_cuda_compile", override=True)
def tvm_callback_cuda_compile(code, _): # pylint: disable=unused-variable
ptx = nvcc.compile_cuda(code)
return ptx
# one line to build the function.
def check_device(device):
dev = tvm.device(device, 0)
if not tvm.testing.device_enabled(device):
print("skip because %s is not enabled.." % device)
return
myfunc = tvm.build(schedule, [placeholder_a, result_b], device)
buff_a = tvm.nd.array(
(np.random.uniform(size=arr_size) * 256).astype(placeholder_a.dtype), dev
)
buff_b = tvm.nd.array(np.zeros(arr_size, dtype=result_b.dtype), dev)
myfunc(buff_a, buff_b)
tvm.testing.assert_allclose(buff_b.numpy(), buff_a.numpy() + 3, rtol=1e-6)
check_device("cuda")
if __name__ == "__main__":
test_exp()
try_warp_memory()
test_multiple_cache_write()
test_add()
test_log_pow_llvm()
test_popcount()
test_fmod()