blob: cc4066d5addaa9321f6079b9ee02fd42e91aa597 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import numpy as np
import tvm
from tvm import te
from tvm.contrib import graph_runtime, util
from tvm import relay
import tvm.micro as micro
from tvm.micro import create_micro_mod
# Use real micro device - an STM32F746 discovery board
# SETUP:
# Be sure to have openocd installed and running
# Ex : openocd -f board/stm32f7discovery.cfg
# Be sure to have the ST CMSIS library downloaded, installed and
# Ex : export CMSIS_ST_PATH="/home/yourid/st/STM32Cube_FW_F7_V1.16.0/Drivers/CMSIS"
DEV_CONFIG_A = micro.device.arm.stm32f746xx.generate_config("127.0.0.1", 6666)
DEV_CONFIG_B = micro.device.arm.stm32f746xx.generate_config("127.0.0.1", 6666)
TARGET = "micro_dev"
def relay_micro_build(func, dev_config, params=None):
"""Create a graph runtime module with a micro device context from a Relay function.
Parameters
----------
func : relay.Function
function to compile
dev_config : Dict[str, Any]
MicroTVM config dict for the target device
params : dict
input parameters that do not change during inference
Return
------
mod : tvm.runtime.Module
graph runtime module for the target device
"""
with tvm.transform.PassContext(
disabled_pass={"FuseOps"}, config={"tir.disable_vectorize": True}
):
graph, c_mod, params = relay.build(func, target=TARGET, params=params)
micro_mod = micro.create_micro_mod(c_mod, dev_config)
ctx = tvm.micro_dev(0)
mod = graph_runtime.create(graph, micro_mod, ctx)
mod.set_input(**params)
return mod
GDB_INIT_TEMPLATE = """
layout asm
target remote localhost:{gdb_port}
set $pc = UTVMInit
break UTVMDone
"""
def reset_gdbinit():
if "server_port" not in DEV_CONFIG_A:
return
try:
gdb_init_dir = os.environ["MICRO_GDB_INIT_DIR"]
except KeyError:
return
with open(f"{gdb_init_dir}/.gdbinit", "w") as f:
gdb_port = DEV_CONFIG_A["server_port"] - 3333
f.write(GDB_INIT_TEMPLATE.format(gdb_port=gdb_port))
def test_alloc():
"""Test tensor allocation on the device."""
if not tvm.runtime.enabled("micro_dev"):
return
shape = (1024,)
dtype = "float32"
with micro.Session(DEV_CONFIG_A):
ctx = tvm.micro_dev(0)
np_tensor = np.random.uniform(size=shape).astype(dtype)
micro_tensor = tvm.nd.array(np_tensor, ctx)
tvm.testing.assert_allclose(np_tensor, micro_tensor.asnumpy())
def test_add():
"""Test a module which performs addition."""
if not tvm.runtime.enabled("micro_dev"):
return
shape = (1024,)
dtype = "float32"
reset_gdbinit()
# Construct TVM expression.
tvm_shape = tvm.runtime.convert(shape)
A = te.placeholder(tvm_shape, name="A", dtype=dtype)
B = te.placeholder(tvm_shape, name="B", dtype=dtype)
C = te.compute(A.shape, lambda *i: A(*i) + B(*i), name="C")
s = te.create_schedule(C.op)
func_name = "fadd"
c_mod = tvm.build(s, [A, B, C], target="c", name=func_name)
with micro.Session(DEV_CONFIG_A) as sess:
micro_mod = micro.create_micro_mod(c_mod, DEV_CONFIG_A)
micro_func = micro_mod[func_name]
ctx = tvm.micro_dev(0)
a_np = np.random.uniform(size=shape).astype(dtype)
a = tvm.nd.array(a_np, ctx)
b_np = np.random.uniform(size=shape).astype(dtype)
b = tvm.nd.array(b_np, ctx)
c = tvm.nd.array(np.zeros(shape, dtype=dtype), ctx)
micro_func(a, b, c)
# ensure inputs weren't corrupted
tvm.testing.assert_allclose(a.asnumpy(), a_np)
tvm.testing.assert_allclose(b.asnumpy(), b_np)
# ensure output is correct
tvm.testing.assert_allclose(c.asnumpy(), a.asnumpy() + b.asnumpy())
def test_workspace_add():
"""Test a module which uses a workspace to compute an intermediate value."""
if not tvm.runtime.enabled("micro_dev"):
return
shape = (1024,)
dtype = "float32"
reset_gdbinit()
# Construct TVM expression.
tvm_shape = tvm.runtime.convert(shape)
A = te.placeholder(tvm_shape, name="A", dtype=dtype)
B = te.placeholder(tvm_shape, name="B", dtype=dtype)
B = te.compute(A.shape, lambda *i: A(*i) + 1, name="B")
C = te.compute(A.shape, lambda *i: B(*i) + 1, name="C")
s = te.create_schedule(C.op)
func_name = "fadd_two_workspace"
c_mod = tvm.build(s, [A, C], target="c", name=func_name)
with micro.Session(DEV_CONFIG_A) as sess:
micro_mod = micro.create_micro_mod(c_mod, DEV_CONFIG_A)
micro_func = micro_mod[func_name]
ctx = tvm.micro_dev(0)
a_np = np.random.uniform(size=shape).astype(dtype)
a = tvm.nd.array(a_np, ctx)
c = tvm.nd.array(np.zeros(shape, dtype=dtype), ctx)
micro_func(a, c)
# ensure input wasn't corrupted
tvm.testing.assert_allclose(a.asnumpy(), a_np)
# ensure output is correct
tvm.testing.assert_allclose(c.asnumpy(), a.asnumpy() + 2.0)
def test_graph_runtime():
"""Test a program which uses the graph runtime."""
if not tvm.runtime.enabled("micro_dev"):
return
shape = (1024,)
dtype = "float32"
# Construct Relay program.
x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype))
xx = relay.multiply(x, x)
z = relay.add(xx, relay.const(1.0))
func = relay.Function([x], z)
with micro.Session(DEV_CONFIG_A):
mod = relay_micro_build(func, DEV_CONFIG_A)
x_in = np.random.uniform(size=shape[0]).astype(dtype)
mod.run(x=x_in)
result = mod.get_output(0).asnumpy()
tvm.testing.assert_allclose(mod.get_input(0).asnumpy(), x_in)
tvm.testing.assert_allclose(result, x_in * x_in + 1.0)
def test_conv2d():
if not tvm.runtime.enabled("micro_dev"):
return
from tvm.relay import create_executor
from tvm.relay import transform
dshape = (1, 4, 16, 16)
dtype = "int8"
func_name = "fused_nn_conv2d"
reset_gdbinit()
# Construct Relay program.
x = relay.var("x", shape=dshape, dtype=dtype)
conv_expr = relay.nn.conv2d(x, relay.var("w"), kernel_size=(3, 3), padding=(1, 1), channels=4)
func = relay.Function(relay.analysis.free_vars(conv_expr), conv_expr)
mod = tvm.IRModule.from_expr(func)
mod = transform.InferType()(mod)
x_shape = list(map(lambda x: x.value, mod["main"].params[0].checked_type.shape))
w_shape = list(map(lambda x: x.value, mod["main"].params[1].checked_type.shape))
out_shape = list(map(lambda x: x.value, mod["main"].ret_type.shape))
with tvm.transform.PassContext(config={"tir.disable_vectorize": True}):
graph, c_mod, params = relay.build(mod, target="c")
with micro.Session(DEV_CONFIG_A):
micro_mod = micro.create_micro_mod(c_mod, DEV_CONFIG_A)
candidate_func_name = func_name
for i in range(100):
try:
micro_func = micro_mod[candidate_func_name]
break
except tvm.TVMError as e:
candidate_func_name = f"{func_name}_{i}"
else:
assert False
ctx = tvm.micro_dev(0)
x_data = tvm.nd.array(np.random.uniform(size=x_shape).astype(dtype), ctx)
w_data = tvm.nd.array(np.random.uniform(size=w_shape).astype(dtype), ctx)
result = tvm.nd.array(np.zeros(shape=out_shape, dtype=dtype), ctx)
micro_func(x_data, w_data, result)
out_data = np.zeros(out_shape, dtype=dtype)
params = {"x": x_data.asnumpy(), "w": w_data.asnumpy()}
intrp = create_executor("debug")
expected_result = intrp.evaluate(mod["main"])(x_data, w_data)
tvm.testing.assert_allclose(result.asnumpy(), expected_result.asnumpy())
def test_interleave_sessions():
"""Test closing and reopening sessions."""
if not tvm.runtime.enabled("micro_dev"):
return
shape = (1024,)
dtype = "float32"
# Construct Relay add program.
x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype))
ret = relay.add(x, relay.const(1.0))
add_const_func = relay.Function([x], ret)
sess_a = micro.Session(DEV_CONFIG_A)
sess_b = micro.Session(DEV_CONFIG_B)
with sess_a:
np_tensor_a = np.random.uniform(size=shape).astype(dtype)
micro_tensor_a = tvm.nd.array(np_tensor_a, tvm.micro_dev(0))
with sess_b:
np_tensor_b = np.random.uniform(size=shape).astype(dtype)
micro_tensor_b = tvm.nd.array(np_tensor_b, tvm.micro_dev(0))
with sess_a:
add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_A)
add_const_mod.run(x=micro_tensor_a)
add_result = add_const_mod.get_output(0).asnumpy()
tvm.testing.assert_allclose(add_result, np_tensor_a + 1.0)
with sess_b:
add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_B)
add_const_mod.run(x=micro_tensor_b)
add_result = add_const_mod.get_output(0).asnumpy()
tvm.testing.assert_allclose(add_result, np_tensor_b + 1.0)
def test_nested_sessions():
"""Test entering and exiting nested session contexts."""
if not tvm.runtime.enabled("micro_dev"):
return
shape = (1024,)
dtype = "float32"
# Construct Relay add program.
x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype))
ret = relay.add(x, relay.const(1.0))
add_const_func = relay.Function([x], ret)
sess_a = micro.Session(DEV_CONFIG_A)
sess_b = micro.Session(DEV_CONFIG_B)
with sess_a:
np_tensor_a = np.random.uniform(size=shape).astype(dtype)
micro_tensor_a = tvm.nd.array(np_tensor_a, tvm.micro_dev(0))
with sess_b:
np_tensor_b = np.random.uniform(size=shape).astype(dtype)
micro_tensor_b = tvm.nd.array(np_tensor_b, tvm.micro_dev(0))
add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_A)
add_const_mod.run(x=micro_tensor_a)
add_result = add_const_mod.get_output(0).asnumpy()
tvm.testing.assert_allclose(add_result, np_tensor_a + 1.0)
def test_inactive_session_use():
"""Test the use of objects allocated in a session that is no longer active."""
if not tvm.runtime.enabled("micro_dev"):
return
shape = (1024,)
dtype = "float32"
# Construct Relay add program.
x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype))
ret = relay.add(x, relay.const(1.0))
add_const_func = relay.Function([x], ret)
sess_a = micro.Session(DEV_CONFIG_A)
sess_b = micro.Session(DEV_CONFIG_B)
with sess_a:
np_tensor_a = np.random.uniform(size=shape).astype(dtype)
micro_tensor_a = tvm.nd.array(np_tensor_a, tvm.micro_dev(0))
add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_A)
with sess_b:
# These objects belong to `sess_a`.
add_const_mod.run(x=micro_tensor_a)
add_result = add_const_mod.get_output(0).asnumpy()
tvm.testing.assert_allclose(add_result, np_tensor_a + 1.0)
# TODO add workspace alloc/free stress test
if __name__ == "__main__":
test_alloc()
print()
print("finished alloc test")
input("[press enter to continue]")
test_add()
print()
print("finished add test")
input("[press enter to continue]")
test_workspace_add()
print()
print("finished workspace add test")
input("[press enter to continue]")
test_graph_runtime()
print()
print("finished graph runtime test")
input("[press enter to continue]")
test_conv2d()
print()
print("finished conv2d test")
input("[press enter to continue]")
# disable for now as these are currently broken
# test_interleave_sessions()
# print()
# print('finished interleaved sessions test')
# input('[press enter to continue]')
# test_nested_sessions()
# print()
# print('finished nested sessions test')
# input('[press enter to continue]')
test_inactive_session_use()
print()
print("finished use inactive session test")
input("[press enter to continue]")