blob: 6958888e9bb69d1862ebeb53dc59d367cec00283 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import numpy as np
import tvm
import tvm.testing
from tvm import te
from tvm.topi.nn.pooling import pool2d
def test_tensor():
m = te.size_var("m")
n = te.size_var("n")
l = te.size_var("l")
A = te.placeholder((m, l), name="A")
B = te.placeholder((n, l), name="B")
T = te.compute((m, n, l), lambda i, j, k: A[i, k] * B[j, k])
print(T)
print(T.op.body)
assert tuple(T.shape) == (m, n, l)
assert isinstance(A.op, tvm.te.PlaceholderOp)
assert A == A
assert T.op.output(0) == T
assert T.op.output(0).__hash__() == T.__hash__()
d = {T.op.output(0): 1}
assert d[T] == 1
assert T[0][0][0].astype("float16").dtype == "float16"
def test_rank_zero():
m = te.size_var("m")
A = te.placeholder((m,), name="A")
scale = te.placeholder((), name="s")
k = te.reduce_axis((0, m), name="k")
T = te.compute((), lambda: te.sum(A[k] * scale(), axis=k))
print(T)
print(T.op.body)
assert tuple(T.shape) == ()
def test_conv1d():
n = te.size_var("n")
A = te.placeholder((n + 2), name="A")
def computeB(ii):
i = ii + 1
return A[i - 1] + A[i] + A[i + 1]
B = te.compute(n, computeB)
def test_tensor_slice():
n = te.size_var("n")
A = te.compute((n, n), lambda i, j: 1)
B = te.compute((n,), lambda i: A[0][i] + A[0][i])
def test_tensor_reduce_multi_axis():
m = te.size_var("m")
n = te.size_var("n")
A = te.placeholder((m, n), name="A")
k1 = te.reduce_axis((0, n), "k")
k2 = te.reduce_axis((0, m), "k")
C = te.compute((1,), lambda _: te.sum(A[k1, k2], axis=(k1, k2)))
C = te.compute((1,), lambda _: te.sum(A[k1, k2], axis=[k1, k2]))
def test_tensor_comm_reducer():
m = te.size_var("m")
n = te.size_var("n")
A = te.placeholder((m, n), name="A")
k = te.reduce_axis((0, n), "k")
mysum = te.comm_reducer(lambda x, y: x + y, lambda t: tvm.tir.const(0, dtype=t))
C = te.compute((m,), lambda i: mysum(A[i, k], axis=k))
def test_tensor_comm_reducer_overload():
m = te.size_var("m")
n = te.size_var("n")
mysum = te.comm_reducer(lambda x, y: x + y, lambda t: tvm.tir.const(0, dtype=t))
sum_res = mysum(m, n)
def test_tensor_reduce():
m = te.size_var("m")
n = te.size_var("n")
l = te.size_var("l")
A = te.placeholder((m, l), name="A")
B = te.placeholder((n, l), name="B")
T = te.compute((m, n, l), lambda i, j, k: A[i, k] * B[j, k])
rv = te.reduce_axis((0, A.shape[1]), "k")
C = te.compute((m, n), lambda i, j: te.sum(T(i, j, rv + 1), axis=rv))
# json load save
C_json = tvm.ir.save_json(C)
C_loaded = tvm.ir.load_json(C_json)
assert isinstance(C_loaded, te.tensor.Tensor)
assert str(C_loaded) == str(C)
def test_tensor_reduce_multiout_with_cond():
def fcombine(x, y):
return x[0] + y[0], x[1] + y[1]
def fidentity(t0, t1):
return tvm.tir.const(0, t0), tvm.tir.const(1, t1)
mysum = te.comm_reducer(fcombine, fidentity, name="mysum")
m = te.var("m")
n = te.var("n")
idx = te.placeholder((m, n), name="idx", dtype="int32")
val = te.placeholder((m, n), name="val", dtype="int32")
k = te.reduce_axis((0, n), "k")
cond = te.floormod(k, 2) == 0
T0, T1 = te.compute((m,), lambda i: mysum((idx[i, k], val[i, k]), axis=k, where=cond), name="T")
def test_tensor_compute1():
m = 1024
factor = 16
dtype = "float32"
def intrin_vadd(n):
x = te.placeholder((n,))
y = te.placeholder((n,))
z = te.compute(x.shape, lambda i: x[i] + y[i])
def intrin_func(ins, outs):
ib = tvm.tir.ir_builder.create()
ib.emit(
tvm.tir.call_extern(
outs[0].dtype,
"vadd",
ins[0].access_ptr("r"),
ins[1].access_ptr("r"),
outs[0].access_ptr("wr"),
)
)
return ib.get()
return te.decl_tensor_intrin(z.op, intrin_func, default_buffer_params={"offset_factor": n})
vadd = intrin_vadd(factor)
A = te.placeholder((m // factor, factor), name="A", dtype=dtype)
B = te.placeholder((m // factor, factor), name="B", dtype=dtype)
C = te.compute((m // factor, factor), lambda i: vadd(A[i, 0:factor], B[i, 0:factor]))
s = te.create_schedule(C.op)
# check lowering with the CSE pass disabled as otherwise it would do some commoning
with tvm.transform.PassContext(opt_level=3, disabled_pass=["tir.CommonSubexprElimTIR"]):
stmt = tvm.lower(s, [A, B, C])["main"].body
assert isinstance(stmt.body, tvm.tir.Evaluate)
def test_tensor_compute2():
M = 2048
N = 1024
L = 1024
factor = 16
factor1 = 32
factor2 = 32
dtype = "float32"
def intrin_gemm(m, n, l):
k = te.reduce_axis((0, l))
x = te.placeholder((m, l))
y = te.placeholder((n, l))
# in theory, no relation
z = te.compute((m, n), lambda i, j: te.sum(x[i][k] * y[j][k], axis=k))
def intrin_func(ins, outs):
x_ptr = ins[0].access_ptr("r")
y_ptr = ins[1].access_ptr("r")
z_ptr = outs[0].access_ptr("w")
body = tvm.tir.call_packed("gemv", x_ptr, y_ptr, z_ptr, m, n, l)
reset = tvm.tir.call_packed("fill_zero", z_ptr, m, n)
update = tvm.tir.call_packed("gemv_add", x_ptr, y_ptr, z_ptr, m, n, l)
return body, reset, update
return te.decl_tensor_intrin(z.op, intrin_func, default_buffer_params={"offset_factor": n})
vgemm = intrin_gemm(factor1, factor2, factor)
A = te.placeholder((M // factor1, L // factor, factor1, factor), name="A", dtype=dtype)
B = te.placeholder((N // factor2, L // factor, factor2, factor), name="B", dtype=dtype)
k = te.reduce_axis((0, L // factor), name="k")
C = te.compute(
(M // factor1, N // factor2, factor1, factor2),
lambda i, j: vgemm(
A[i, k, 0:factor1, 0:factor], B[j, k, 0:factor2, 0:factor], reduce_axis=k
),
)
s = te.create_schedule(C.op)
# check lowering with the CSE pass disabled as otherwise it would do some commoning
with tvm.transform.PassContext(opt_level=3, disabled_pass=["tir.CommonSubexprElimTIR"]):
stmt = tvm.lower(s, [A, B, C])["main"].body
assert isinstance(stmt.body.body[0], tvm.tir.Evaluate)
assert isinstance(stmt.body.body[1].body, tvm.tir.Evaluate)
def test_tensor_scan():
m = te.size_var("m")
n = te.size_var("n")
x = te.placeholder((m, n))
s = te.placeholder((m, n))
res = tvm.te.scan(
te.compute((1, n), lambda _, i: x[0, i]),
te.compute((m, n), lambda t, i: s[t - 1, i] + x[t, i]),
s,
)
assert tuple(res.shape) == (m, n)
def test_scan_multi_out():
m = te.size_var("m")
n = te.size_var("n")
x1 = te.placeholder((m, n))
s1 = te.placeholder((m, n))
x2 = te.placeholder((m, n))
s2 = te.placeholder((m, n))
s1_init = te.compute((1, n), lambda _, i: x1[0, i])
s2_init = te.compute((1, n), lambda _, i: x2[0, i])
s1_update = te.compute((m, n), lambda t, i: s1[t - 1, i] + s2[t - 1, i] + x1[t, i])
s2_update = te.compute((m, n), lambda t, i: x2[t, i] + s2[t - 1, i])
r0, r1 = tvm.te.scan([s1_init, s2_init], [s1_update, s2_update], [s1, s2])
assert r0.value_index == 0
assert r1.value_index == 1
json_str = tvm.ir.save_json(r0.op)
zz = tvm.ir.load_json(json_str)
assert isinstance(zz, tvm.te.ScanOp)
def test_extern():
m = te.size_var("m")
A = te.placeholder((m,), name="A")
def extern_func(ins, outs):
assert isinstance(ins[0], tvm.te.schedule.Buffer)
return tvm.tir.call_packed("myadd", ins[0].data, outs[0].data, m)
B = te.extern((m,), [A], extern_func)
assert tuple(B.shape) == (m,)
def test_extern_multi_out():
m = te.size_var("m")
A = te.placeholder((m,), name="A")
B = te.compute((m,), lambda i: A[i] * 10)
def extern_func(ins, outs):
assert isinstance(ins[0], tvm.te.schedule.Buffer)
return tvm.tir.call_packed("myadd", ins[0].data, outs[0].data, outs[1].data, m)
res = te.extern([A.shape, A.shape], [A, B], extern_func)
assert len(res) == 2
assert res[1].value_index == 1
def test_tuple_inputs():
m = te.size_var("m")
n = te.size_var("n")
A0 = te.placeholder((m, n), name="A0")
A1 = te.placeholder((m, n), name="A1")
T0, T1 = te.compute((m, n), lambda i, j: (A0[i, j] * 2, A1[i, j] * 3), name="T")
s = te.create_schedule(T0.op)
for i in range(len(T0.shape)):
assert T0.shape[i] == T1.shape[i]
assert T0.op == T1.op
assert T0.value_index == 0
assert T1.value_index == 1
def test_tuple_with_different_deps():
m = te.size_var("m")
n = te.size_var("n")
A0 = te.placeholder((m, n), name="A1")
A1 = te.placeholder((m, n), name="A2")
B0, B1 = te.compute((m, n), lambda i, j: (A0[i, j] * 2, A1[i, j] * 3), name="B")
C = te.compute((m, n), lambda i, j: B0[i, j] + 4, name="C")
s = te.create_schedule(C.op)
xo, xi = s[C].split(C.op.axis[0], factor=10)
s[B0.op].compute_at(s[C], xo)
sch = s.normalize()
bounds = tvm.te.schedule.InferBound(sch)
stmt = tvm.te.schedule.ScheduleOps(sch, bounds)
def get_B1_realize(x):
if (
isinstance(x, tvm.tir.ProducerRealize)
and x.producer.op == B1.op
and x.producer.value_index == 1
):
ret.append(x)
ret = []
tvm.tir.stmt_functor.post_order_visit(stmt, get_B1_realize)
assert stmt.producer == C and len(ret) == 1
def test_tensor_inputs():
x = te.placeholder((1,), name="x")
y = te.compute(x.shape, lambda i: x[i] + x[i])
assert tuple(y.op.input_tensors) == (x,)
def test_tensor_pool():
def intrin_pool():
A = te.placeholder((64, 16, 16), name="A")
kh = te.reduce_axis((0, 3), name="kh")
kw = te.reduce_axis((0, 3), name="kw")
P = te.compute(
(64, 14, 14),
lambda c, oh, ow: tvm.te.max(A[c, oh + kh, ow + kw], axis=[kh, kw]),
name="p",
)
def intrin_func(ins, outs):
dinp = ins[0]
dout = outs[0]
return tvm.tir.call_packed("op", dinp, dout)
return te.decl_tensor_intrin(P.op, intrin_func, default_buffer_params={"offset_factor": 1})
A = te.placeholder((1, 64, 16, 16), name="A")
P = pool2d(
data=A, kernel=(3, 3), stride=(1, 1), dilation=(1, 1), padding=(0, 0, 0, 0), pool_type="max"
)
s = te.create_schedule(P.op)
_, oh, _, _ = P.op.axis
intrin = intrin_pool()
s[P].tensorize(oh, intrin)
tvm.lower(s, [A, P])
def test_tensor_scalar_mixed():
# test te with tensor and scalar
a = np.array(np.random.uniform(size=(10,)), "float32")
b = np.array(np.random.uniform(size=(1))[0], "float32")
c = np.array(np.random.uniform(size=(10,)), "float32")
@tvm.register_func("tvm.test_tensor_scalar_scale")
def my_scale(tensor, scalar, out):
out_np = tensor.numpy() * scalar.numpy()
tvm.nd.array(out_np).copyto(out)
A = te.placeholder(a.shape, name="A")
B = te.placeholder(b.shape, name="B")
C = te.extern(
a.shape,
[A, B],
lambda ins, outs: tvm.tir.call_packed(
"tvm.test_tensor_scalar_scale", ins[0], ins[1], outs[0]
),
name="C",
)
s = te.create_schedule(C.op)
f = tvm.build(s, [A, B, C], "llvm")
ta = tvm.nd.array(a)
tb = tvm.nd.array(b)
tc = tvm.nd.array(c)
f(ta, tb, tc)
tvm.testing.assert_allclose(a * b, tc.numpy())
def test_tensor_scalar():
# test te with scalar shape
a = np.array(np.random.uniform(size=(1))[0], "float32")
b = np.array(0.0, "float32")
@tvm.register_func("tvm.test_tensor_scalar_copy")
def mycopy(x, y):
x.copyto(y)
A = te.placeholder(a.shape, name="A")
B = te.extern(
a.shape,
[A],
lambda ins, outs: tvm.tir.call_packed("tvm.test_tensor_scalar_copy", ins[0], outs[0]),
name="B",
)
s = te.create_schedule(B.op)
f = tvm.build(s, [A, B], "llvm")
ta = tvm.nd.array(a)
tb = tvm.nd.array(b)
f(ta, tb)
tvm.testing.assert_allclose(ta.numpy(), tb.numpy())
if __name__ == "__main__":
test_tensor()
test_rank_zero()
test_conv1d()
test_tensor_slice()
test_tensor_reduce_multi_axis()
test_tensor_comm_reducer()
test_tensor_comm_reducer_overload()
test_tensor_reduce()
test_tensor_reduce_multiout_with_cond()
test_tensor_compute1()
test_tensor_compute2()
test_tensor_scan()
test_scan_multi_out()
test_extern()
test_extern_multi_out()
test_tuple_inputs()
test_tuple_with_different_deps()
test_tensor_inputs()
test_tensor_pool()
test_tensor_scalar_mixed()
test_tensor_scalar()