blob: c76636a4d242049ae17d4050516b72896390538d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Unit test VTA's instructions """
import tvm
from tvm import te
import numpy as np
import topi
from tvm.contrib import util
import vta
import vta.testing
from vta.testing import simulator
np.random.seed(0xdeadb)
def test_save_load_out():
"""Test save/store output command"""
def _run(env, remote):
n = 6
x = te.placeholder(
(n, n, env.BATCH, env.BLOCK_OUT),
name="x",
dtype=env.acc_dtype)
x_buf = te.compute(
(n, n, env.BATCH, env.BLOCK_OUT),
lambda *i: x(*i), "x_buf")
# insert no-op that won't be optimized away
y_buf = te.compute(
(n, n, env.BATCH, env.BLOCK_OUT),
lambda *i: x_buf(*i)>>0, "y_buf")
y = te.compute(
(n, n, env.BATCH, env.BLOCK_OUT),
lambda *i: y_buf(*i).astype(env.inp_dtype), "y")
# schedule
s = te.create_schedule(y.op)
s[x_buf].set_scope(env.acc_scope)
s[x_buf].pragma(x_buf.op.axis[0], env.dma_copy)
s[y_buf].set_scope(env.acc_scope)
s[y_buf].pragma(y_buf.op.axis[0], env.alu)
s[y].pragma(y.op.axis[0], env.dma_copy)
# verification
with vta.build_config():
m = vta.build(s, [x, y], "ext_dev", env.target_host)
if not remote:
return
temp = util.tempdir()
m.save(temp.relpath("load_act.o"))
remote.upload(temp.relpath("load_act.o"))
f = remote.load_module("load_act.o")
# verify
ctx = remote.ext_dev(0)
x_np = np.random.randint(
1, 10, size=(n, n, env.BATCH, env.BLOCK_OUT)).astype(x.dtype)
y_np = x_np.astype(y.dtype)
x_nd = tvm.nd.array(x_np, ctx)
y_nd = tvm.nd.empty(y_np.shape, ctx=ctx, dtype=y_np.dtype)
if env.TARGET in ["sim", "tsim"]:
simulator.clear_stats()
f(x_nd, y_nd)
np.testing.assert_equal(y_np, y_nd.asnumpy())
if env.TARGET in ["sim", "tsim"]:
sim_stats = simulator.stats()
print("Save load execution statistics:")
for k, v in sim_stats.items():
print("\t{:<16}: {:>16}".format(k, v))
vta.testing.run(_run)
def test_padded_load():
"""Test padded load."""
def _run(env, remote):
def check_padded_load(pad_before, pad_after, test_name=None):
# declare
n = 3
m = 5
x = te.placeholder(
(n, m, env.BATCH, env.BLOCK_OUT),
name="x",
dtype=env.acc_dtype)
x_buf = topi.nn.pad(x, pad_before, pad_after, name="y")
# insert no-op that won't be optimized away
y_buf = te.compute((n + pad_before[0] + pad_after[0],
m + pad_before[1] + pad_after[1],
env.BATCH,
env.BLOCK_OUT), lambda *i: x_buf(*i)>>0, "y_buf")
y = te.compute((n + pad_before[0] + pad_after[0],
m + pad_before[1] + pad_after[1],
env.BATCH,
env.BLOCK_OUT), lambda *i: y_buf(*i).astype(env.inp_dtype), "y")
# schedule
s = te.create_schedule(y.op)
s[x_buf].set_scope(env.acc_scope)
s[x_buf].pragma(x_buf.op.axis[0], env.dma_copy)
s[y_buf].set_scope(env.acc_scope)
s[y_buf].pragma(y_buf.op.axis[0], env.alu)
s[y].pragma(y.op.axis[0], env.dma_copy)
# build
with vta.build_config():
mod = vta.build(s, [x, y], "ext_dev", env.target_host)
if not remote:
return
temp = util.tempdir()
mod.save(temp.relpath("padded_load.o"))
remote.upload(temp.relpath("padded_load.o"))
f = remote.load_module("padded_load.o")
# verify
ctx = remote.ext_dev(0)
x_np = np.random.randint(0, 10, size=(
n, m, env.BATCH, env.BLOCK_OUT)).astype(x.dtype)
y_np = np.zeros((n + pad_before[0] + pad_after[0],
m + pad_before[1] + pad_after[1],
env.BATCH,
env.BLOCK_OUT)).astype(y.dtype)
y_np[pad_before[0]:pad_before[0] + n,
pad_before[1]:pad_before[1] + m,
:] = x_np
x_nd = tvm.nd.array(x_np, ctx)
y_nd = tvm.nd.empty(y_np.shape, ctx=ctx, dtype=y_np.dtype)
if env.TARGET in ["sim", "tsim"]:
simulator.clear_stats()
f(x_nd, y_nd)
np.testing.assert_equal(y_np, y_nd.asnumpy())
if env.TARGET in ["sim", "tsim"]:
sim_stats = simulator.stats()
print("Padded {} load execution statistics:".format(test_name))
for k, v in sim_stats.items():
print("\t{:<16}: {:>16}".format(k, v))
check_padded_load([2, 0, 0, 0], [0, 0, 0, 0], test_name="Y0")
check_padded_load([0, 2, 0, 0], [0, 0, 0, 0], test_name="Y1")
check_padded_load([0, 0, 0, 0], [2, 0, 0, 0], test_name="X0")
check_padded_load([0, 0, 0, 0], [0, 2, 0, 0], test_name="X1")
check_padded_load([1, 1, 0, 0], [1, 1, 0, 0], test_name="all")
vta.testing.run(_run)
def test_gemm():
"""Test GEMM."""
def _run(env, remote):
# declare
o = 4
n = 1
m = 4
x = te.placeholder((o, n, env.BATCH, env.BLOCK_IN), name="x", dtype=env.inp_dtype)
w = te.placeholder((m, n, env.BLOCK_OUT, env.BLOCK_IN), name="w", dtype=env.wgt_dtype)
x_buf = te.compute((o, n, env.BATCH, env.BLOCK_IN), lambda *i: x(*i), "x_buf")
w_buf = te.compute((m, n, env.BLOCK_OUT, env.BLOCK_IN), lambda *i: w(*i), "w_buf")
ko = te.reduce_axis((0, n), name="ko")
ki = te.reduce_axis((0, env.BLOCK_IN), name="ki")
y_gem = te.compute(
(o, m, env.BATCH, env.BLOCK_OUT),
lambda bo, co, bi, ci:
te.sum(x_buf[bo, ko, bi, ki].astype(env.acc_dtype) *
w_buf[co, ko, ci, ki].astype(env.acc_dtype),
axis=[ko, ki]),
name="y_gem")
y_shf = te.compute(
(o, m, env.BATCH, env.BLOCK_OUT),
lambda *i: y_gem(*i)>>8,
name="y_shf")
y_max = te.compute(
(o, m, env.BATCH, env.BLOCK_OUT),
lambda *i: tvm.te.max(y_shf(*i), 0),
"y_max") #relu
y_min = te.compute(
(o, m, env.BATCH, env.BLOCK_OUT),
lambda *i: tvm.te.min(y_max(*i), (1<<(env.INP_WIDTH-1))-1),
"y_min") #relu
y = te.compute(
(o, m, env.BATCH, env.BLOCK_OUT),
lambda *i: y_min(*i).astype(env.inp_dtype),
name="y")
if not remote:
return
def verify(s, name=None):
mod = vta.build(s, [x, w, y], "ext_dev", env.target_host)
temp = util.tempdir()
mod.save(temp.relpath("gemm.o"))
remote.upload(temp.relpath("gemm.o"))
f = remote.load_module("gemm.o")
# verify
ctx = remote.ext_dev(0)
x_np = np.random.randint(
-128, 128, size=(o, n, env.BATCH, env.BLOCK_IN)).astype(x.dtype)
w_np = np.random.randint(
-128, 128, size=(m, n, env.BLOCK_OUT, env.BLOCK_IN)).astype(w.dtype)
y_np = np.zeros((o, m, env.BATCH, env.BLOCK_OUT)).astype(y.dtype)
x_nd = tvm.nd.array(x_np, ctx)
w_nd = tvm.nd.array(w_np, ctx)
y_nd = tvm.nd.array(y_np, ctx)
y_np = y_np.astype(env.acc_dtype)
for b in range(o):
for i in range(m):
for j in range(n):
y_np[b,i,:] += np.dot(x_np[b,j,:].astype(env.acc_dtype),
w_np[i,j].T.astype(env.acc_dtype))
y_np = np.right_shift(y_np, 8)
y_np = np.clip(y_np, 0, (1<<(env.INP_WIDTH-1))-1).astype(y.dtype)
if env.TARGET in ["sim", "tsim"]:
simulator.clear_stats()
f(x_nd, w_nd, y_nd)
np.testing.assert_equal(y_np, y_nd.asnumpy())
if env.TARGET in ["sim", "tsim"]:
sim_stats = simulator.stats()
print("GEMM schedule:{} execution statistics:".format(name))
for k, v in sim_stats.items():
print("\t{:<16}: {:>16}".format(k, v))
def test_schedule1():
# default schedule with no smt
s = te.create_schedule(y.op)
# set the scope of the SRAM buffers
s[x_buf].set_scope(env.inp_scope)
s[w_buf].set_scope(env.wgt_scope)
s[y_gem].set_scope(env.acc_scope)
s[y_shf].set_scope(env.acc_scope)
s[y_max].set_scope(env.acc_scope)
s[y_min].set_scope(env.acc_scope)
# set pragmas for DMA transfer and ALU ops
s[x_buf].compute_at(s[y_gem], ko)
s[x_buf].pragma(s[x_buf].op.axis[0], env.dma_copy)
s[w_buf].compute_at(s[y_gem], ko)
s[w_buf].pragma(s[w_buf].op.axis[0], env.dma_copy)
s[y_shf].pragma(s[y_shf].op.axis[0], env.alu)
s[y_max].pragma(s[y_max].op.axis[0], env.alu)
s[y_min].pragma(s[y_min].op.axis[0], env.alu)
s[y].pragma(s[y].op.axis[0], env.dma_copy)
# tensorization
s[y_gem].reorder(
ko,
s[y_gem].op.axis[0],
s[y_gem].op.axis[1],
s[y_gem].op.axis[2],
s[y_gem].op.axis[3],
ki)
s[y_gem].tensorize(s[y_gem].op.axis[2], env.gemm)
verify(s, name="default")
def test_smt():
# test smt schedule
s = te.create_schedule(y.op)
s[x_buf].set_scope(env.inp_scope)
s[w_buf].set_scope(env.wgt_scope)
s[y_gem].set_scope(env.acc_scope)
s[y_shf].set_scope(env.acc_scope)
s[y_max].set_scope(env.acc_scope)
s[y_min].set_scope(env.acc_scope)
abo, aco, abi, aci = s[y].op.axis
abo1, abo2 = s[y].split(abo, nparts=2)
s[y].bind(abo1, te.thread_axis("cthread"))
s[y_gem].compute_at(s[y], abo1)
s[y_shf].compute_at(s[y], abo1)
s[y_max].compute_at(s[y], abo1)
s[y_min].compute_at(s[y], abo1)
s[y_gem].reorder(
ko,
s[y_gem].op.axis[0],
s[y_gem].op.axis[1],
s[y_gem].op.axis[2],
s[y_gem].op.axis[3],
ki)
s[y_gem].tensorize(s[y_gem].op.axis[2], env.gemm)
s[y_shf].pragma(s[y_shf].op.axis[0], env.alu)
s[y_max].pragma(s[y_max].op.axis[0], env.alu)
s[y_min].pragma(s[y_min].op.axis[0], env.alu)
s[x_buf].compute_at(s[y_gem], ko)
s[x_buf].pragma(s[x_buf].op.axis[0], env.dma_copy)
s[w_buf].compute_at(s[y_gem], ko)
s[w_buf].pragma(s[w_buf].op.axis[0], env.dma_copy)
s[y].pragma(abo2, env.dma_copy)
verify(s, name="smt")
test_schedule1()
test_smt()
vta.testing.run(_run)
def test_alu():
def _run(env, remote):
def check_alu(tvm_op, np_op=None, use_imm=False, test_name=None):
"""Test ALU"""
m = 8
n = 8
imm = np.random.randint(1,5)
# compute
a = te.placeholder(
(m, n, env.BATCH, env.BLOCK_OUT),
name="a",
dtype=env.acc_dtype)
a_buf = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: a(*i),
"a_buf") #DRAM->SRAM
if use_imm:
res_buf = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: tvm_op(a_buf(*i), imm),
"res_buf") #compute
else:
b = te.placeholder(
(m, n, env.BATCH, env.BLOCK_OUT),
name="b",
dtype=env.acc_dtype)
b_buf = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: b(*i),
"b_buf") #DRAM->SRAM
res_buf = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: tvm_op(a_buf(*i), b_buf(*i)),
"res_buf") #compute5B
res = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: res_buf(*i).astype(env.inp_dtype),
"res") #SRAM->DRAM
# schedule
s = te.create_schedule(res.op)
s[a_buf].set_scope(env.acc_scope) # SRAM
s[a_buf].pragma(a_buf.op.axis[0], env.dma_copy) # DRAM->SRAM
s[res_buf].set_scope(env.acc_scope) # SRAM
s[res_buf].pragma(res_buf.op.axis[0], env.alu) # compute
s[res].pragma(res.op.axis[0], env.dma_copy) # SRAM->DRAM
if not use_imm:
s[b_buf].set_scope(env.acc_scope) # SRAM
s[b_buf].pragma(b_buf.op.axis[0], env.dma_copy) # DRAM->SRAM
if not remote:
return
# build
with vta.build_config():
if use_imm:
mod = vta.build(s, [a, res], "ext_dev", env.target_host)
else:
mod = vta.build(s, [a, b, res], "ext_dev", env.target_host)
temp = util.tempdir()
mod.save(temp.relpath("load_act.o"))
remote.upload(temp.relpath("load_act.o"))
f = remote.load_module("load_act.o")
# verify
ctx = remote.ext_dev(0)
a_np = np.random.randint(
-16, 16, size=(m, n, env.BATCH, env.BLOCK_OUT)).astype(a.dtype)
if use_imm:
res_np = np_op(a_np, imm) if np_op else tvm_op(a_np, imm)
else:
b_np = np.random.randint(
-16, 16, size=(m, n, env.BATCH, env.BLOCK_OUT)).astype(b.dtype)
res_np = np_op(a_np, b_np) if np_op else tvm_op(a_np, b_np)
res_np = res_np.astype(res.dtype)
a_nd = tvm.nd.array(a_np, ctx)
res_nd = tvm.nd.array(
np.zeros((m, n, env.BATCH, env.BLOCK_OUT)).astype(res.dtype), ctx)
if env.TARGET in ["sim", "tsim"]:
simulator.clear_stats()
if use_imm:
f(a_nd, res_nd)
else:
b_nd = tvm.nd.array(b_np, ctx)
f(a_nd, b_nd, res_nd)
np.testing.assert_equal(res_np, res_nd.asnumpy())
if env.TARGET in ["sim", "tsim"]:
sim_stats = simulator.stats()
print("ALU {} execution statistics:".format(test_name))
for k, v in sim_stats.items():
print("\t{:<16}: {:>16}".format(k, v))
check_alu(lambda x, y: x << y, np.left_shift, use_imm=True, test_name="SHL")
check_alu(tvm.te.max, np.maximum, use_imm=True, test_name="MAX")
check_alu(tvm.te.max, np.maximum, test_name="MAX")
check_alu(lambda x, y: x + y, use_imm=True, test_name="ADD")
check_alu(lambda x, y: x + y, test_name="ADD")
check_alu(lambda x, y: x >> y, np.right_shift, use_imm=True, test_name="SHR")
vta.testing.run(_run)
def test_relu():
"""Test RELU on ALU"""
def _run(env, remote):
m = 8
n = 10
# compute
a = te.placeholder(
(m, n, env.BATCH, env.BLOCK_OUT),
name="a",
dtype=env.acc_dtype)
a_buf = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: a(*i),
"a_buf") # DRAM->SRAM
max_buf = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: tvm.te.max(a_buf(*i), 0),
"res_buf") # relu
min_buf = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: tvm.te.min(max_buf(*i), (1<<(env.INP_WIDTH-1))-1),
"max_buf") # relu
res = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: min_buf(*i).astype(env.inp_dtype),
"min_buf") # SRAM->DRAM
# schedule
s = te.create_schedule(res.op)
s[a_buf].set_scope(env.acc_scope) # SRAM
s[a_buf].pragma(a_buf.op.axis[0], env.dma_copy) # DRAM->SRAM
s[max_buf].set_scope(env.acc_scope) # SRAM
s[min_buf].set_scope(env.acc_scope) # SRAM
s[max_buf].pragma(max_buf.op.axis[0], env.alu) # compute
s[min_buf].pragma(min_buf.op.axis[0], env.alu) # compute
s[res].pragma(res.op.axis[0], env.dma_copy) # SRAM->DRAM
# build
with vta.build_config():
mod = vta.build(s, [a, res], "ext_dev", env.target_host)
if not remote:
return
temp = util.tempdir()
mod.save(temp.relpath("load_act.o"))
remote.upload(temp.relpath("load_act.o"))
f = remote.load_module("load_act.o")
# verify
ctx = remote.ext_dev(0)
a_np = np.random.randint(
-256, 256, size=(m, n, env.BATCH, env.BLOCK_OUT)).astype(a.dtype)
res_np = np.clip(a_np, 0, (1<<(env.INP_WIDTH-1))-1).astype(res.dtype)
a_nd = tvm.nd.array(a_np, ctx)
res_nd = tvm.nd.array(
np.zeros((m, n, env.BATCH, env.BLOCK_OUT)).astype(res.dtype), ctx)
if env.TARGET in ["sim", "tsim"]:
simulator.clear_stats()
f(a_nd, res_nd)
np.testing.assert_equal(res_np, res_nd.asnumpy())
if env.TARGET in ["sim", "tsim"]:
sim_stats = simulator.stats()
print("Relu execution statistics:")
for k, v in sim_stats.items():
print("\t{:<16}: {:>16}".format(k, v))
vta.testing.run(_run)
def test_shift_and_scale():
"""Test shift and scale on ALU"""
def _run(env, remote):
m = 2
n = 8
imm_shift = np.random.randint(0,8)
imm_scale = np.random.randint(1,5)
# compute
a = te.placeholder(
(m, n, env.BATCH, env.BLOCK_OUT),
name="a", dtype=env.acc_dtype)
a_buf = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: a(*i),
"a_buf") # DRAM->SRAM
res_shift = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: a_buf(*i)+imm_shift,
"res_shift") # compute
res_scale = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: res_shift(*i)>>imm_scale,
"res_scale") # compute
res = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: res_scale(*i).astype(env.inp_dtype),
"res") # SRAM->DRAM
# schedule
s = te.create_schedule(res.op)
s[a_buf].set_scope(env.acc_scope) # SRAM
s[res_shift].set_scope(env.acc_scope) # SRAM
s[res_scale].set_scope(env.acc_scope) # SRAM
s[a_buf].pragma(a_buf.op.axis[0], env.dma_copy) # DRAM->SRAM
s[res_shift].pragma(res_shift.op.axis[0], env.alu) # compute
s[res_scale].pragma(res_scale.op.axis[0], env.alu) # compute
s[res].pragma(res.op.axis[0], env.dma_copy) # SRAM->DRAM
# build
mod = vta.build(s, [a, res], "ext_dev", env.target_host)
if not remote:
return
temp = util.tempdir()
mod.save(temp.relpath("load_act.o"))
remote.upload(temp.relpath("load_act.o"))
f = remote.load_module("load_act.o")
# verify
ctx = remote.ext_dev(0)
a_np = np.random.randint(
-10, 10, size=(m, n, env.BATCH, env.BLOCK_OUT)).astype(a.dtype)
res_np = np.right_shift((a_np + imm_shift), imm_scale)
res_np = res_np.astype(res.dtype)
a_nd = tvm.nd.array(a_np, ctx)
res_nd = tvm.nd.array(
np.zeros((m, n, env.BATCH, env.BLOCK_OUT)).astype(res.dtype), ctx)
if env.TARGET in ["sim", "tsim"]:
simulator.clear_stats()
f(a_nd, res_nd)
np.testing.assert_equal(res_np, res_nd.asnumpy())
if env.TARGET in ["sim", "tsim"]:
sim_stats = simulator.stats()
print("Shift and scale execution statistics:")
for k, v in sim_stats.items():
print("\t{:<16}: {:>16}".format(k, v))
vta.testing.run(_run)
def test_runtime_array():
def _run(env, remote):
n = 100
ctx = remote.ext_dev(0)
x_np = np.random.randint(
1, 10, size=(n, n, env.BATCH, env.BLOCK_OUT)).astype("int8")
x_nd = tvm.nd.array(x_np, ctx)
np.testing.assert_equal(x_np, x_nd.asnumpy())
vta.testing.run(_run)
if __name__ == "__main__":
test_runtime_array()
test_save_load_out()
test_padded_load()
test_gemm()
test_alu()
test_relu()
test_shift_and_scale()