blob: 574273f274f49e26d0a763e59e887b2297986998 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Unit test VTA's instructions """
import tvm
import numpy as np
import topi
from tvm.contrib import util
import vta
import vta.testing
from vta.testing import simulator
def test_save_load_out():
"""Test save/store output command"""
def _run(env, remote):
n = 6
x = tvm.placeholder(
(n, n, env.BATCH, env.BLOCK_OUT),
name="x",
dtype=env.acc_dtype)
x_buf = tvm.compute(
(n, n, env.BATCH, env.BLOCK_OUT),
lambda *i: x(*i), "x_buf")
# insert no-op that won't be optimized away
y_buf = tvm.compute(
(n, n, env.BATCH, env.BLOCK_OUT),
lambda *i: x_buf(*i)>>0, "y_buf")
y = tvm.compute(
(n, n, env.BATCH, env.BLOCK_OUT),
lambda *i: y_buf(*i).astype(env.inp_dtype), "y")
# schedule
s = tvm.create_schedule(y.op)
s[x_buf].set_scope(env.acc_scope)
s[x_buf].pragma(x_buf.op.axis[0], env.dma_copy)
s[y_buf].set_scope(env.acc_scope)
s[y_buf].pragma(y_buf.op.axis[0], env.alu)
s[y].pragma(y.op.axis[0], env.dma_copy)
# verification
with vta.build_config():
m = vta.build(s, [x, y], "ext_dev", env.target_host)
if not remote:
return
temp = util.tempdir()
m.save(temp.relpath("load_act.o"))
remote.upload(temp.relpath("load_act.o"))
f = remote.load_module("load_act.o")
# verify
ctx = remote.ext_dev(0)
x_np = np.random.randint(
1, 10, size=(n, n, env.BATCH, env.BLOCK_OUT)).astype(x.dtype)
y_np = x_np.astype(y.dtype)
x_nd = tvm.nd.array(x_np, ctx)
y_nd = tvm.nd.empty(y_np.shape, ctx=ctx, dtype=y_np.dtype)
if env.TARGET in ["sim", "tsim"]:
simulator.clear_stats()
f(x_nd, y_nd)
np.testing.assert_equal(y_np, y_nd.asnumpy())
if env.TARGET in ["sim", "tsim"]:
sim_stats = simulator.stats()
print("Save load execution statistics:")
for k, v in sim_stats.items():
print("\t{:<16}: {:>16}".format(k, v))
vta.testing.run(_run)
def test_padded_load():
"""Test padded load."""
def _run(env, remote):
# declare
n = 3
m = 5
pad_before = [2, 1, 0, 0]
pad_after = [1, 2, 0, 0]
x = tvm.placeholder(
(n, m, env.BATCH, env.BLOCK_OUT),
name="x",
dtype=env.acc_dtype)
x_buf = topi.nn.pad(x, pad_before, pad_after, name="y")
# insert no-op that won't be optimized away
y_buf = tvm.compute((n + pad_before[0] + pad_after[0],
m + pad_before[1] + pad_after[1],
env.BATCH,
env.BLOCK_OUT), lambda *i: x_buf(*i)>>0, "y_buf")
y = tvm.compute((n + pad_before[0] + pad_after[0],
m + pad_before[1] + pad_after[1],
env.BATCH,
env.BLOCK_OUT), lambda *i: y_buf(*i).astype(env.inp_dtype), "y")
# schedule
s = tvm.create_schedule(y.op)
s[x_buf].set_scope(env.acc_scope)
s[x_buf].pragma(x_buf.op.axis[0], env.dma_copy)
s[y_buf].set_scope(env.acc_scope)
s[y_buf].pragma(y_buf.op.axis[0], env.alu)
s[y].pragma(y.op.axis[0], env.dma_copy)
# build
with vta.build_config():
mod = vta.build(s, [x, y], "ext_dev", env.target_host)
if not remote:
return
temp = util.tempdir()
mod.save(temp.relpath("padded_load.o"))
remote.upload(temp.relpath("padded_load.o"))
f = remote.load_module("padded_load.o")
# verify
ctx = remote.ext_dev(0)
x_np = np.random.randint(-10, 10, size=(
n, m, env.BATCH, env.BLOCK_OUT)).astype(x.dtype)
y_np = np.zeros((n + pad_before[0] + pad_after[0],
m + pad_before[1] + pad_after[1],
env.BATCH,
env.BLOCK_OUT)).astype(y.dtype)
y_np[pad_before[0]:pad_before[0] + n,
pad_before[1]:pad_before[1] + m,
:] = x_np
x_nd = tvm.nd.array(x_np, ctx)
y_nd = tvm.nd.empty(y_np.shape, ctx=ctx, dtype=y_np.dtype)
if env.TARGET in ["sim", "tsim"]:
simulator.clear_stats()
f(x_nd, y_nd)
np.testing.assert_equal(y_np, y_nd.asnumpy())
if env.TARGET in ["sim", "tsim"]:
sim_stats = simulator.stats()
print("Padded load execution statistics:")
for k, v in sim_stats.items():
print("\t{:<16}: {:>16}".format(k, v))
vta.testing.run(_run)
def test_gemm():
"""Test GEMM."""
def _run(env, remote):
# declare
o = 4
n = 1
m = 4
x = tvm.placeholder((o, n, env.BATCH, env.BLOCK_IN), name="x", dtype=env.inp_dtype)
w = tvm.placeholder((m, n, env.BLOCK_OUT, env.BLOCK_IN), name="w", dtype=env.wgt_dtype)
x_buf = tvm.compute((o, n, env.BATCH, env.BLOCK_IN), lambda *i: x(*i), "x_buf")
w_buf = tvm.compute((m, n, env.BLOCK_OUT, env.BLOCK_IN), lambda *i: w(*i), "w_buf")
ko = tvm.reduce_axis((0, n), name="ko")
ki = tvm.reduce_axis((0, env.BLOCK_IN), name="ki")
y_gem = tvm.compute(
(o, m, env.BATCH, env.BLOCK_OUT),
lambda bo, co, bi, ci:
tvm.sum(x_buf[bo, ko, bi, ki].astype(env.acc_dtype) *
w_buf[co, ko, ci, ki].astype(env.acc_dtype),
axis=[ko, ki]),
name="y_gem")
y_shf = tvm.compute(
(o, m, env.BATCH, env.BLOCK_OUT),
lambda *i: y_gem(*i)>>8,
name="y_shf")
y_max = tvm.compute(
(o, m, env.BATCH, env.BLOCK_OUT),
lambda *i: tvm.max(y_shf(*i), 0),
"y_max") #relu
y_min = tvm.compute(
(o, m, env.BATCH, env.BLOCK_OUT),
lambda *i: tvm.min(y_max(*i), (1<<(env.INP_WIDTH-1))-1),
"y_min") #relu
y = tvm.compute(
(o, m, env.BATCH, env.BLOCK_OUT),
lambda *i: y_min(*i).astype(env.inp_dtype),
name="y")
if not remote:
return
def verify(s, name=None):
mod = vta.build(s, [x, w, y], "ext_dev", env.target_host)
temp = util.tempdir()
mod.save(temp.relpath("gemm.o"))
remote.upload(temp.relpath("gemm.o"))
f = remote.load_module("gemm.o")
# verify
ctx = remote.ext_dev(0)
x_np = np.random.randint(
-128, 128, size=(o, n, env.BATCH, env.BLOCK_IN)).astype(x.dtype)
w_np = np.random.randint(
-128, 128, size=(m, n, env.BLOCK_OUT, env.BLOCK_IN)).astype(w.dtype)
y_np = np.zeros((o, m, env.BATCH, env.BLOCK_OUT)).astype(y.dtype)
x_nd = tvm.nd.array(x_np, ctx)
w_nd = tvm.nd.array(w_np, ctx)
y_nd = tvm.nd.array(y_np, ctx)
y_np = y_np.astype(env.acc_dtype)
for b in range(o):
for i in range(m):
for j in range(n):
y_np[b,i,:] += np.dot(x_np[b,j,:].astype(env.acc_dtype),
w_np[i,j].T.astype(env.acc_dtype))
y_np = np.right_shift(y_np, 8)
y_np = np.clip(y_np, 0, (1<<(env.INP_WIDTH-1))-1).astype(y.dtype)
if env.TARGET in ["sim", "tsim"]:
simulator.clear_stats()
f(x_nd, w_nd, y_nd)
np.testing.assert_equal(y_np, y_nd.asnumpy())
if env.TARGET in ["sim", "tsim"]:
sim_stats = simulator.stats()
print("GEMM schedule:{} execution statistics:".format(name))
for k, v in sim_stats.items():
print("\t{:<16}: {:>16}".format(k, v))
def test_schedule1():
# default schedule with no smt
s = tvm.create_schedule(y.op)
# set the scope of the SRAM buffers
s[x_buf].set_scope(env.inp_scope)
s[w_buf].set_scope(env.wgt_scope)
s[y_gem].set_scope(env.acc_scope)
s[y_shf].set_scope(env.acc_scope)
s[y_max].set_scope(env.acc_scope)
s[y_min].set_scope(env.acc_scope)
# set pragmas for DMA transfer and ALU ops
s[x_buf].compute_at(s[y_gem], ko)
s[x_buf].pragma(s[x_buf].op.axis[0], env.dma_copy)
s[w_buf].compute_at(s[y_gem], ko)
s[w_buf].pragma(s[w_buf].op.axis[0], env.dma_copy)
s[y_shf].pragma(s[y_shf].op.axis[0], env.alu)
s[y_max].pragma(s[y_max].op.axis[0], env.alu)
s[y_min].pragma(s[y_min].op.axis[0], env.alu)
s[y].pragma(s[y].op.axis[0], env.dma_copy)
# tensorization
s[y_gem].reorder(
ko,
s[y_gem].op.axis[0],
s[y_gem].op.axis[1],
s[y_gem].op.axis[2],
s[y_gem].op.axis[3],
ki)
s[y_gem].tensorize(s[y_gem].op.axis[2], env.gemm)
verify(s, name="default")
def test_smt():
# test smt schedule
s = tvm.create_schedule(y.op)
s[x_buf].set_scope(env.inp_scope)
s[w_buf].set_scope(env.wgt_scope)
s[y_gem].set_scope(env.acc_scope)
s[y_shf].set_scope(env.acc_scope)
s[y_max].set_scope(env.acc_scope)
s[y_min].set_scope(env.acc_scope)
abo, aco, abi, aci = s[y].op.axis
abo1, abo2 = s[y].split(abo, nparts=2)
s[y].bind(abo1, tvm.thread_axis("cthread"))
s[y_gem].compute_at(s[y], abo1)
s[y_shf].compute_at(s[y], abo1)
s[y_max].compute_at(s[y], abo1)
s[y_min].compute_at(s[y], abo1)
s[y_gem].reorder(
ko,
s[y_gem].op.axis[0],
s[y_gem].op.axis[1],
s[y_gem].op.axis[2],
s[y_gem].op.axis[3],
ki)
s[y_gem].tensorize(s[y_gem].op.axis[2], env.gemm)
s[y_shf].pragma(s[y_shf].op.axis[0], env.alu)
s[y_max].pragma(s[y_max].op.axis[0], env.alu)
s[y_min].pragma(s[y_min].op.axis[0], env.alu)
s[x_buf].compute_at(s[y_gem], ko)
s[x_buf].pragma(s[x_buf].op.axis[0], env.dma_copy)
s[w_buf].compute_at(s[y_gem], ko)
s[w_buf].pragma(s[w_buf].op.axis[0], env.dma_copy)
s[y].pragma(abo2, env.dma_copy)
verify(s, name="smt")
test_schedule1()
test_smt()
vta.testing.run(_run)
def test_alu():
def _run(env, remote):
def check_alu(tvm_op, np_op=None, use_imm=False, test_name=None):
"""Test ALU"""
m = 8
n = 8
imm = np.random.randint(1,5)
# compute
a = tvm.placeholder(
(m, n, env.BATCH, env.BLOCK_OUT),
name="a",
dtype=env.acc_dtype)
a_buf = tvm.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: a(*i),
"a_buf") #DRAM->SRAM
if use_imm:
res_buf = tvm.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: tvm_op(a_buf(*i), imm),
"res_buf") #compute
else:
b = tvm.placeholder(
(m, n, env.BATCH, env.BLOCK_OUT),
name="b",
dtype=env.acc_dtype)
b_buf = tvm.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: b(*i),
"b_buf") #DRAM->SRAM
res_buf = tvm.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: tvm_op(a_buf(*i), b_buf(*i)),
"res_buf") #compute5B
res = tvm.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: res_buf(*i).astype(env.inp_dtype),
"res") #SRAM->DRAM
# schedule
s = tvm.create_schedule(res.op)
s[a_buf].set_scope(env.acc_scope) # SRAM
s[a_buf].pragma(a_buf.op.axis[0], env.dma_copy) # DRAM->SRAM
s[res_buf].set_scope(env.acc_scope) # SRAM
s[res_buf].pragma(res_buf.op.axis[0], env.alu) # compute
s[res].pragma(res.op.axis[0], env.dma_copy) # SRAM->DRAM
if not use_imm:
s[b_buf].set_scope(env.acc_scope) # SRAM
s[b_buf].pragma(b_buf.op.axis[0], env.dma_copy) # DRAM->SRAM
if not remote:
return
# build
with vta.build_config():
if use_imm:
mod = vta.build(s, [a, res], "ext_dev", env.target_host)
else:
mod = vta.build(s, [a, b, res], "ext_dev", env.target_host)
temp = util.tempdir()
mod.save(temp.relpath("load_act.o"))
remote.upload(temp.relpath("load_act.o"))
f = remote.load_module("load_act.o")
# verify
ctx = remote.ext_dev(0)
a_np = np.random.randint(
-16, 16, size=(m, n, env.BATCH, env.BLOCK_OUT)).astype(a.dtype)
if use_imm:
res_np = np_op(a_np, imm) if np_op else tvm_op(a_np, imm)
else:
b_np = np.random.randint(
-16, 16, size=(m, n, env.BATCH, env.BLOCK_OUT)).astype(b.dtype)
res_np = np_op(a_np, b_np) if np_op else tvm_op(a_np, b_np)
res_np = res_np.astype(res.dtype)
a_nd = tvm.nd.array(a_np, ctx)
res_nd = tvm.nd.array(
np.zeros((m, n, env.BATCH, env.BLOCK_OUT)).astype(res.dtype), ctx)
if env.TARGET in ["sim", "tsim"]:
simulator.clear_stats()
if use_imm:
f(a_nd, res_nd)
else:
b_nd = tvm.nd.array(b_np, ctx)
f(a_nd, b_nd, res_nd)
np.testing.assert_equal(res_np, res_nd.asnumpy())
if env.TARGET in ["sim", "tsim"]:
sim_stats = simulator.stats()
print("ALU {} execution statistics:".format(test_name))
for k, v in sim_stats.items():
print("\t{:<16}: {:>16}".format(k, v))
check_alu(lambda x, y: x << y, np.left_shift, use_imm=True, test_name="SHL")
check_alu(tvm.max, np.maximum, use_imm=True, test_name="MAX")
check_alu(tvm.max, np.maximum, test_name="MAX")
check_alu(lambda x, y: x + y, use_imm=True, test_name="ADD")
check_alu(lambda x, y: x + y, test_name="ADD")
check_alu(lambda x, y: x >> y, np.right_shift, use_imm=True, test_name="SHR")
vta.testing.run(_run)
def test_relu():
"""Test RELU on ALU"""
def _run(env, remote):
m = 8
n = 10
# compute
a = tvm.placeholder(
(m, n, env.BATCH, env.BLOCK_OUT),
name="a",
dtype=env.acc_dtype)
a_buf = tvm.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: a(*i),
"a_buf") # DRAM->SRAM
max_buf = tvm.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: tvm.max(a_buf(*i), 0),
"res_buf") # relu
min_buf = tvm.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: tvm.min(max_buf(*i), (1<<(env.INP_WIDTH-1))-1),
"max_buf") # relu
res = tvm.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: min_buf(*i).astype(env.inp_dtype),
"min_buf") # SRAM->DRAM
# schedule
s = tvm.create_schedule(res.op)
s[a_buf].set_scope(env.acc_scope) # SRAM
s[a_buf].pragma(a_buf.op.axis[0], env.dma_copy) # DRAM->SRAM
s[max_buf].set_scope(env.acc_scope) # SRAM
s[min_buf].set_scope(env.acc_scope) # SRAM
s[max_buf].pragma(max_buf.op.axis[0], env.alu) # compute
s[min_buf].pragma(min_buf.op.axis[0], env.alu) # compute
s[res].pragma(res.op.axis[0], env.dma_copy) # SRAM->DRAM
# build
with vta.build_config():
mod = vta.build(s, [a, res], "ext_dev", env.target_host)
if not remote:
return
temp = util.tempdir()
mod.save(temp.relpath("load_act.o"))
remote.upload(temp.relpath("load_act.o"))
f = remote.load_module("load_act.o")
# verify
ctx = remote.ext_dev(0)
a_np = np.random.randint(
-256, 256, size=(m, n, env.BATCH, env.BLOCK_OUT)).astype(a.dtype)
res_np = np.clip(a_np, 0, (1<<(env.INP_WIDTH-1))-1).astype(res.dtype)
a_nd = tvm.nd.array(a_np, ctx)
res_nd = tvm.nd.array(
np.zeros((m, n, env.BATCH, env.BLOCK_OUT)).astype(res.dtype), ctx)
if env.TARGET in ["sim", "tsim"]:
simulator.clear_stats()
f(a_nd, res_nd)
np.testing.assert_equal(res_np, res_nd.asnumpy())
if env.TARGET in ["sim", "tsim"]:
sim_stats = simulator.stats()
print("Relu execution statistics:")
for k, v in sim_stats.items():
print("\t{:<16}: {:>16}".format(k, v))
vta.testing.run(_run)
def test_shift_and_scale():
"""Test shift and scale on ALU"""
def _run(env, remote):
m = 2
n = 8
imm_shift = np.random.randint(0,8)
imm_scale = np.random.randint(1,5)
# compute
a = tvm.placeholder(
(m, n, env.BATCH, env.BLOCK_OUT),
name="a", dtype=env.acc_dtype)
a_buf = tvm.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: a(*i),
"a_buf") # DRAM->SRAM
res_shift = tvm.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: a_buf(*i)+imm_shift,
"res_shift") # compute
res_scale = tvm.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: res_shift(*i)>>imm_scale,
"res_scale") # compute
res = tvm.compute(
(m, n, env.BATCH, env.BLOCK_OUT),
lambda *i: res_scale(*i).astype(env.inp_dtype),
"res") # SRAM->DRAM
# schedule
s = tvm.create_schedule(res.op)
s[a_buf].set_scope(env.acc_scope) # SRAM
s[res_shift].set_scope(env.acc_scope) # SRAM
s[res_scale].set_scope(env.acc_scope) # SRAM
s[a_buf].pragma(a_buf.op.axis[0], env.dma_copy) # DRAM->SRAM
s[res_shift].pragma(res_shift.op.axis[0], env.alu) # compute
s[res_scale].pragma(res_scale.op.axis[0], env.alu) # compute
s[res].pragma(res.op.axis[0], env.dma_copy) # SRAM->DRAM
# build
mod = vta.build(s, [a, res], "ext_dev", env.target_host)
if not remote:
return
temp = util.tempdir()
mod.save(temp.relpath("load_act.o"))
remote.upload(temp.relpath("load_act.o"))
f = remote.load_module("load_act.o")
# verify
ctx = remote.ext_dev(0)
a_np = np.random.randint(
-10, 10, size=(m, n, env.BATCH, env.BLOCK_OUT)).astype(a.dtype)
res_np = np.right_shift((a_np + imm_shift), imm_scale)
res_np = res_np.astype(res.dtype)
a_nd = tvm.nd.array(a_np, ctx)
res_nd = tvm.nd.array(
np.zeros((m, n, env.BATCH, env.BLOCK_OUT)).astype(res.dtype), ctx)
if env.TARGET in ["sim", "tsim"]:
simulator.clear_stats()
f(a_nd, res_nd)
np.testing.assert_equal(res_np, res_nd.asnumpy())
if env.TARGET in ["sim", "tsim"]:
sim_stats = simulator.stats()
print("Shift and scale execution statistics:")
for k, v in sim_stats.items():
print("\t{:<16}: {:>16}".format(k, v))
vta.testing.run(_run)
def test_runtime_array():
def _run(env, remote):
n = 100
ctx = remote.ext_dev(0)
x_np = np.random.randint(
1, 10, size=(n, n, env.BATCH, env.BLOCK_OUT)).astype("int8")
x_nd = tvm.nd.array(x_np, ctx)
np.testing.assert_equal(x_np, x_nd.asnumpy())
vta.testing.run(_run)
if __name__ == "__main__":
test_runtime_array()
test_save_load_out()
test_padded_load()
test_gemm()
test_alu()
test_relu()
test_shift_and_scale()