blob: 18587acd46aeb4ad6e8457c9c2cab28fd84e7001 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import contextlib
import copy
import datetime
import glob
import logging
import os
import subprocess
import sys
import logging
import pytest
import numpy as np
import onnx
from PIL import Image
import tvm
import tvm.rpc
import tvm.micro
import tvm.testing
import tvm.relay as relay
from tvm.micro.contrib import zephyr
from tvm.contrib import utils
from tvm.relay.expr_functor import ExprMutator
from tvm.relay.op.annotation import compiler_begin, compiler_end
import conftest
_LOG = logging.getLogger(__name__)
PLATFORMS = conftest.PLATFORMS
def _make_sess_from_op(model, zephyr_board, west_cmd, op_name, sched, arg_bufs, build_config):
target = tvm.target.target.micro(model)
target = tvm.target.Target(target=target, host=target)
with tvm.transform.PassContext(opt_level=3, config={"tir.disable_vectorize": True}):
mod = tvm.build(sched, arg_bufs, target=target, name=op_name)
return _make_session(model, target, zephyr_board, west_cmd, mod, build_config)
def _make_session(model, target, zephyr_board, west_cmd, mod, build_config):
parent_dir = os.path.dirname(__file__)
filename = os.path.splitext(os.path.basename(__file__))[0]
prev_build = f"{os.path.join(parent_dir, 'archive')}_{filename}_{zephyr_board}_last_build.micro"
workspace_root = os.path.join(
f"{os.path.join(parent_dir, 'workspace')}_{filename}_{zephyr_board}",
datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S"),
)
workspace_parent = os.path.dirname(workspace_root)
if not os.path.exists(workspace_parent):
os.makedirs(workspace_parent)
workspace = tvm.micro.Workspace(debug=True, root=workspace_root)
test_dir = os.path.dirname(os.path.realpath(os.path.expanduser(__file__)))
tvm_source_dir = os.path.join(test_dir, "..", "..", "..")
runtime_path = os.path.join(tvm_source_dir, "apps", "microtvm", "zephyr", "host_driven")
compiler = zephyr.ZephyrCompiler(
project_dir=runtime_path,
board=zephyr_board,
zephyr_toolchain_variant="zephyr",
west_cmd=west_cmd,
)
opts = tvm.micro.default_options(os.path.join(runtime_path, "crt"))
# TODO(weberlo) verify this is necessary
opts["bin_opts"]["ccflags"] = ["-std=gnu++14"]
opts["lib_opts"]["ccflags"] = ["-std=gnu++14"]
flasher_kw = {}
if build_config["debug"]:
flasher_kw["debug_rpc_session"] = tvm.rpc.connect("127.0.0.1", 9090)
session_kw = {
"flasher": compiler.flasher(**flasher_kw),
}
if not build_config["skip_build"]:
session_kw["binary"] = tvm.micro.build_static_runtime(
# the x86 compiler *expects* you to give the exact same dictionary for both
# lib_opts and bin_opts. so the library compiler is mutating lib_opts and
# the binary compiler is expecting those mutations to be in bin_opts.
# TODO(weberlo) fix this very bizarre behavior
workspace,
compiler,
mod,
opts,
)
if os.path.exists(prev_build):
os.unlink(prev_build)
session_kw["binary"].archive(prev_build, metadata_only=True)
else:
unarchive_dir = utils.tempdir()
session_kw["binary"] = tvm.micro.MicroBinary.unarchive(
prev_build, unarchive_dir.relpath("binary")
)
return tvm.micro.Session(**session_kw)
def _make_add_sess(model, zephyr_board, west_cmd, build_config):
A = tvm.te.placeholder((2,), dtype="int8")
B = tvm.te.placeholder((1,), dtype="int8")
C = tvm.te.compute(A.shape, lambda i: A[i] + B[0], name="C")
sched = tvm.te.create_schedule(C.op)
return _make_sess_from_op(model, zephyr_board, west_cmd, "add", sched, [A, B, C], build_config)
# The same test code can be executed on both the QEMU simulation and on real hardware.
@tvm.testing.requires_micro
def test_compile_runtime(platform, west_cmd, skip_build, tvm_debug):
"""Test compiling the on-device runtime."""
model, zephyr_board = PLATFORMS[platform]
build_config = {"skip_build": skip_build, "debug": tvm_debug}
# NOTE: run test in a nested function so cPython will delete arrays before closing the session.
def test_basic_add(sess):
A_data = tvm.nd.array(np.array([2, 3], dtype="int8"), device=sess.device)
assert (A_data.numpy() == np.array([2, 3])).all()
B_data = tvm.nd.array(np.array([4], dtype="int8"), device=sess.device)
assert (B_data.numpy() == np.array([4])).all()
C_data = tvm.nd.array(np.array([0, 0], dtype="int8"), device=sess.device)
assert (C_data.numpy() == np.array([0, 0])).all()
system_lib = sess.get_system_lib()
system_lib.get_function("add")(A_data, B_data, C_data)
assert (C_data.numpy() == np.array([6, 7])).all()
with _make_add_sess(model, zephyr_board, west_cmd, build_config) as sess:
test_basic_add(sess)
@tvm.testing.requires_micro
def test_platform_timer(platform, west_cmd, skip_build, tvm_debug):
"""Test compiling the on-device runtime."""
model, zephyr_board = PLATFORMS[platform]
build_config = {"skip_build": skip_build, "debug": tvm_debug}
# NOTE: run test in a nested function so cPython will delete arrays before closing the session.
def test_basic_add(sess):
A_data = tvm.nd.array(np.array([2, 3], dtype="int8"), device=sess.device)
assert (A_data.numpy() == np.array([2, 3])).all()
B_data = tvm.nd.array(np.array([4], dtype="int8"), device=sess.device)
assert (B_data.numpy() == np.array([4])).all()
C_data = tvm.nd.array(np.array([0, 0], dtype="int8"), device=sess.device)
assert (C_data.numpy() == np.array([0, 0])).all()
system_lib = sess.get_system_lib()
time_eval_f = system_lib.time_evaluator(
"add", sess.device, number=20, repeat=3, min_repeat_ms=40
)
result = time_eval_f(A_data, B_data, C_data)
assert (C_data.numpy() == np.array([6, 7])).all()
assert result.mean > 0
assert len(result.results) == 3
with _make_add_sess(model, zephyr_board, west_cmd, build_config) as sess:
test_basic_add(sess)
@tvm.testing.requires_micro
def test_relay(platform, west_cmd, skip_build, tvm_debug):
"""Testing a simple relay graph"""
model, zephyr_board = PLATFORMS[platform]
build_config = {"skip_build": skip_build, "debug": tvm_debug}
shape = (10,)
dtype = "int8"
# Construct Relay program.
x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype))
xx = relay.multiply(x, x)
z = relay.add(xx, relay.const(np.ones(shape=shape, dtype=dtype)))
func = relay.Function([x], z)
target = tvm.target.target.micro(model)
with tvm.transform.PassContext(opt_level=3, config={"tir.disable_vectorize": True}):
graph, mod, params = tvm.relay.build(func, target=target)
with _make_session(model, target, zephyr_board, west_cmd, mod, build_config) as session:
graph_mod = tvm.micro.create_local_graph_executor(
graph, session.get_system_lib(), session.device
)
graph_mod.set_input(**params)
x_in = np.random.randint(10, size=shape[0], dtype=dtype)
graph_mod.run(x=x_in)
result = graph_mod.get_output(0).numpy()
tvm.testing.assert_allclose(graph_mod.get_input(0).numpy(), x_in)
tvm.testing.assert_allclose(result, x_in * x_in + 1)
@tvm.testing.requires_micro
def test_onnx(platform, west_cmd, skip_build, tvm_debug):
"""Testing a simple ONNX model."""
model, zephyr_board = PLATFORMS[platform]
build_config = {"skip_build": skip_build, "debug": tvm_debug}
# Load test images.
this_dir = os.path.dirname(__file__)
digit_2 = Image.open(f"{this_dir}/testdata/digit-2.jpg").resize((28, 28))
digit_2 = np.asarray(digit_2).astype("float32")
digit_2 = np.expand_dims(digit_2, axis=0)
digit_9 = Image.open(f"{this_dir}/testdata/digit-9.jpg").resize((28, 28))
digit_9 = np.asarray(digit_9).astype("float32")
digit_9 = np.expand_dims(digit_9, axis=0)
# Load ONNX model and convert to Relay.
onnx_model = onnx.load(f"{this_dir}/testdata/mnist-8.onnx")
shape = {"Input3": (1, 1, 28, 28)}
relay_mod, params = relay.frontend.from_onnx(onnx_model, shape=shape, freeze_params=True)
relay_mod = relay.transform.DynamicToStatic()(relay_mod)
# We add the -link-params=1 option to ensure the model parameters are compiled in.
# There is currently a bug preventing the host_driven environment from receiving
# the model weights when set using graph_mod.set_input().
# See: https://github.com/apache/tvm/issues/7567
target = tvm.target.target.micro(model, options=["-link-params=1"])
with tvm.transform.PassContext(opt_level=3, config={"tir.disable_vectorize": True}):
lowered = relay.build(relay_mod, target, params=params)
graph = lowered.get_graph_json()
with _make_session(model, target, zephyr_board, west_cmd, lowered.lib, build_config) as session:
graph_mod = tvm.micro.create_local_graph_executor(
graph, session.get_system_lib(), session.device
)
# Send the digit-2 image and confirm that the correct result is returned.
graph_mod.set_input("Input3", tvm.nd.array(digit_2))
graph_mod.run()
result = graph_mod.get_output(0).numpy()
assert np.argmax(result) == 2
# Send the digit-9 image and confirm that the correct result is returned.
graph_mod.set_input("Input3", tvm.nd.array(digit_9))
graph_mod.run()
result = graph_mod.get_output(0).numpy()
assert np.argmax(result) == 9
class CcompilerAnnotator(ExprMutator):
"""
This is used to create external functions for ccompiler.
A simple annotator that creates the following program:
|
-- begin --
|
add
|
subtract
|
multiply
|
-- end --
|
"""
def __init__(self):
super(CcompilerAnnotator, self).__init__()
self.in_compiler = 0
def visit_call(self, call):
if call.op.name == "add": # Annotate begin at args
if self.in_compiler == 1:
lhs = compiler_begin(super().visit(call.args[0]), "ccompiler")
rhs = compiler_begin(super().visit(call.args[1]), "ccompiler")
op = relay.add(lhs, rhs)
self.in_compiler = 2
return op
elif call.op.name == "subtract":
if self.in_compiler == 1:
lhs = super().visit(call.args[0])
rhs = super().visit(call.args[1])
if isinstance(lhs, relay.expr.Var):
lhs = compiler_begin(lhs, "ccompiler")
if isinstance(rhs, relay.expr.Var):
rhs = compiler_begin(rhs, "ccompiler")
return relay.subtract(lhs, rhs)
elif call.op.name == "multiply": # Annotate end at output
self.in_compiler = 1
lhs = super().visit(call.args[0])
rhs = super().visit(call.args[1])
if isinstance(lhs, relay.expr.Var):
lhs = compiler_begin(lhs, "ccompiler")
if isinstance(rhs, relay.expr.Var):
rhs = compiler_begin(rhs, "ccompiler")
op = relay.multiply(lhs, rhs)
if self.in_compiler == 2:
op = compiler_end(op, "ccompiler")
self.in_compiler = 0
return op
return super().visit_call(call)
def check_result(
relay_mod, model, zephyr_board, west_cmd, map_inputs, out_shape, result, build_config
):
"""Helper function to verify results"""
TOL = 1e-5
target = tvm.target.target.micro(model)
with tvm.transform.PassContext(opt_level=3, config={"tir.disable_vectorize": True}):
graph, mod, params = tvm.relay.build(relay_mod, target=target)
with _make_session(model, target, zephyr_board, west_cmd, mod, build_config) as session:
rt_mod = tvm.micro.create_local_graph_executor(
graph, session.get_system_lib(), session.device
)
rt_mod.set_input(**params)
for name, data in map_inputs.items():
rt_mod.set_input(name, data)
rt_mod.set_input(**params)
rt_mod.run()
out_shapes = out_shape if isinstance(out_shape, list) else [out_shape]
results = result if isinstance(result, list) else [result]
for idx, shape in enumerate(out_shapes):
out = tvm.nd.empty(shape, device=session.device)
out = rt_mod.get_output(idx, out)
tvm.testing.assert_allclose(out.numpy(), results[idx], rtol=TOL, atol=TOL)
@tvm.testing.requires_micro
def test_byoc_microtvm(platform, west_cmd, skip_build, tvm_debug):
"""This is a simple test case to check BYOC capabilities of microTVM"""
model, zephyr_board = PLATFORMS[platform]
build_config = {"skip_build": skip_build, "debug": tvm_debug}
x = relay.var("x", shape=(10, 10))
w0 = relay.var("w0", shape=(10, 10))
w1 = relay.var("w1", shape=(10, 10))
w2 = relay.var("w2", shape=(10, 10))
w3 = relay.var("w3", shape=(10, 10))
w4 = relay.var("w4", shape=(10, 10))
w5 = relay.var("w5", shape=(10, 10))
w6 = relay.var("w6", shape=(10, 10))
w7 = relay.var("w7", shape=(10, 10))
# C compiler
z0 = relay.add(x, w0)
p0 = relay.subtract(z0, w1)
q0 = relay.multiply(p0, w2)
z1 = relay.add(x, w3)
p1 = relay.subtract(z1, w4)
q1 = relay.multiply(p1, w5)
# Other parts on TVM
z2 = relay.add(x, w6)
q2 = relay.subtract(z2, w7)
r = relay.concatenate((q0, q1, q2), axis=0)
f = relay.Function([x, w0, w1, w2, w3, w4, w5, w6, w7], r)
mod = tvm.IRModule()
ann = CcompilerAnnotator()
mod["main"] = ann.visit(f)
mod = tvm.relay.transform.PartitionGraph()(mod)
mod = tvm.relay.transform.InferType()(mod)
x_data = np.random.rand(10, 10).astype("float32")
w_data = []
for _ in range(8):
w_data.append(np.random.rand(10, 10).astype("float32"))
map_inputs = {"w{}".format(i): w_data[i] for i in range(8)}
map_inputs["x"] = x_data
check_result(
relay_mod=mod,
map_inputs=map_inputs,
out_shape=(30, 10),
result=np.concatenate(
(
((x_data + w_data[0]) - w_data[1]) * w_data[2],
((x_data + w_data[3]) - w_data[4]) * w_data[5],
x_data + w_data[6] - w_data[7],
),
axis=0,
),
model=model,
zephyr_board=zephyr_board,
west_cmd=west_cmd,
build_config=build_config,
)
def _make_add_sess_with_shape(model, zephyr_board, west_cmd, shape, build_config):
A = tvm.te.placeholder(shape, dtype="int8")
C = tvm.te.compute(A.shape, lambda i: A[i] + A[i], name="C")
sched = tvm.te.create_schedule(C.op)
return _make_sess_from_op(model, zephyr_board, west_cmd, "add", sched, [A, C], build_config)
@pytest.mark.parametrize(
"shape,",
[
pytest.param((1 * 1024,), id="(1*1024)"),
pytest.param((4 * 1024,), id="(4*1024)"),
pytest.param((16 * 1024,), id="(16*1024)"),
],
)
@tvm.testing.requires_micro
def test_rpc_large_array(platform, west_cmd, skip_build, tvm_debug, shape):
"""Test large RPC array transfer."""
model, zephyr_board = PLATFORMS[platform]
build_config = {"skip_build": skip_build, "debug": tvm_debug}
# NOTE: run test in a nested function so cPython will delete arrays before closing the session.
def test_tensors(sess):
a_np = np.random.randint(low=-128, high=127, size=shape, dtype="int8")
A_data = tvm.nd.array(a_np, device=sess.device)
assert (A_data.asnumpy() == a_np).all()
C_data = tvm.nd.array(np.zeros(shape, dtype="int8"), device=sess.device)
assert (C_data.asnumpy() == np.zeros(shape)).all()
with _make_add_sess_with_shape(model, zephyr_board, west_cmd, shape, build_config) as sess:
test_tensors(sess)
if __name__ == "__main__":
sys.exit(pytest.main([__file__] + sys.argv[1:]))