| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| import tvm |
| from tvm import te |
| import numpy as np |
| import tvm.testing |
| |
| |
| @tvm.testing.uses_gpu |
| def test_add_pipeline(): |
| nn = 64 |
| max_threads = 4 |
| n = tvm.runtime.convert(nn) |
| A = te.placeholder((n,), name="A") |
| |
| def extern_generator(ins, outs): |
| """Manually write the IR for the extern function, add pipeline""" |
| ib = tvm.tir.ir_builder.create() |
| with ib.for_range(0, (n + 1) // 2) as i: |
| ib.emit( |
| outs[0].vstore( |
| i * 2, ins[0].vload(i * 2, "float32x2") + tvm.tir.const(1, "float32x2") |
| ) |
| ) |
| return ib.get() |
| |
| def extern_generator_gpu(ins, outs): |
| """Manually write the IR for the extern function, add pipeline""" |
| ib = tvm.tir.ir_builder.create() |
| bx = te.thread_axis("blockIdx.x") |
| tx = te.thread_axis("threadIdx.x") |
| ib.scope_attr(bx, "thread_extent", (nn + max_threads - 1) // max_threads) |
| ib.scope_attr(tx, "thread_extent", max_threads) |
| idx = bx.var * max_threads + tx.var |
| with ib.if_scope(ib.likely(idx < n)): |
| ib.emit( |
| outs[0].vstore( |
| idx * 2, ins[0].vload(idx * 2, "float32x2") + tvm.tir.const(1, "float32x2") |
| ) |
| ) |
| return ib.get() |
| |
| C_cpu = te.extern(A.shape, [A], extern_generator, name="C") |
| C_gpu = te.extern(A.shape, [A], extern_generator_gpu, name="C") |
| s_cpu = te.create_schedule(C_cpu.op) |
| s_gpu = te.create_schedule(C_gpu.op) |
| print(tvm.lower(s_cpu, [A, C_cpu], simple_mode=True)) |
| print(tvm.lower(s_gpu, [A, C_gpu], simple_mode=True)) |
| |
| def check_target(target): |
| if not tvm.testing.device_enabled(target): |
| return |
| s = s_gpu if target in ["opencl", "cuda"] else s_cpu |
| C = C_gpu if target in ["opencl", "cuda"] else C_cpu |
| # build and invoke the kernel. |
| f = tvm.build(s, [A, C], target) |
| dev = tvm.device(target, 0) |
| # launch the kernel. |
| n = nn |
| a = tvm.nd.array(np.random.uniform(size=n).astype(A.dtype), dev) |
| c = tvm.nd.array(np.zeros(n, dtype=C.dtype), dev) |
| f(a, c) |
| tvm.testing.assert_allclose(c.numpy(), a.numpy() + 1) |
| |
| check_target("llvm") |
| check_target("opencl") |
| check_target("cuda") |
| |
| |
| def test_pack_buffer_simple(): |
| nn = 1024 |
| n = tvm.runtime.convert(nn) |
| A = te.placeholder((n,), name="A") |
| |
| def extern_generator(ins, outs): |
| """Manually write the IR for the extern function, add pipeline.""" |
| return tvm.tir.call_packed("my_extern_array_func1", ins[0], outs[0]) |
| |
| C = te.extern(A.shape, [A], extern_generator, name="C") |
| s = te.create_schedule(C.op) |
| |
| @tvm.register_func |
| def my_extern_array_func1(aa, bb): |
| aa.copyto(bb) |
| |
| def check_target(target): |
| if not tvm.testing.device_enabled(target): |
| return |
| # build and invoke the kernel. |
| f = tvm.build(s, [A, C], target) |
| dev = tvm.cpu(0) |
| # launch the kernel. |
| n = nn |
| a = tvm.nd.array(np.random.uniform(size=n).astype(A.dtype), dev) |
| c = tvm.nd.array(np.zeros(n, dtype=C.dtype), dev) |
| |
| f(a, c) |
| tvm.testing.assert_allclose(c.numpy(), a.numpy()) |
| |
| check_target("stackvm") |
| check_target("llvm") |
| |
| |
| def test_pack_buffer_intermediate(): |
| nn = 1024 |
| n = tvm.runtime.convert(nn) |
| A = te.placeholder((n,), name="A") |
| B = te.compute((n,), lambda i: A[i] + 1, name="B") |
| |
| def extern_generator(ins, outs): |
| """Manually write the IR for the extern function, add pipeline.""" |
| return tvm.tir.call_packed("my_extern_array_func2", ins[0], outs[0]) |
| |
| C = te.extern(B.shape, [B], extern_generator, name="C") |
| s = te.create_schedule(C.op) |
| |
| def check_target(target): |
| if not tvm.testing.device_enabled(target): |
| return |
| # build and invoke the kernel. |
| f = tvm.build(s, [A, C], target) |
| dev = tvm.cpu(0) |
| # launch the kernel. |
| n = nn |
| a = tvm.nd.array(np.random.uniform(size=n).astype(A.dtype), dev) |
| c = tvm.nd.array(np.zeros(n, dtype=C.dtype), dev) |
| |
| @tvm.register_func |
| def my_extern_array_func2(aa, bb): |
| assert aa.shape == a.shape |
| tvm.testing.assert_allclose(aa.numpy(), a.numpy() + 1) |
| aa.copyto(bb) |
| |
| f(a, c) |
| tvm.testing.assert_allclose(c.numpy(), a.numpy() + 1) |
| |
| check_target("llvm") |
| |
| |
| if __name__ == "__main__": |
| test_pack_buffer_simple() |
| test_pack_buffer_intermediate() |
| test_add_pipeline() |