blob: c498dacfbdcfe0e278f496ff8746543bfba5d99d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test rpc based launcher for hexagon"""
import tempfile
import numpy as np
import pytest
import tvm.testing
import tvm.topi.testing
from tvm import te
from tvm.contrib.hexagon.meta_schedule import (
get_hexagon_local_builder,
get_hexagon_rpc_runner,
)
from tvm.s_tir import meta_schedule as ms
from tvm.s_tir.meta_schedule import postproc, schedule_rule
from tvm.s_tir.meta_schedule.arg_info import TensorInfo
from tvm.s_tir.meta_schedule.builder import BuilderInput
from tvm.s_tir.meta_schedule.runner import RunnerInput
from tvm.s_tir.tensor_intrin.hexagon import VRMPY_u8u8i32_INTRIN
from tvm.script import tir as T
from tvm.tir import FloatImm
from .infrastructure import get_hexagon_target
MATMUL_N = 16
MATMUL_M = 32
@tvm.script.ir_module
class MatmulModule:
"""Matmultest class"""
# pylint: disable=no-self-argument
@T.prim_func
def main(a: T.handle, b: T.handle, c: T.handle) -> None: # type: ignore
# pylint: disable=missing-function-docstring
T.func_attr({"global_symbol": "main", "tir.noalias": True})
a_buffer = T.match_buffer(a, (16, 16), "float32")
b_buffer = T.match_buffer(b, (16, 16), "float32")
c_buffer = T.match_buffer(c, (16, 16), "float32")
for i, j, k in T.grid(16, 16, 16):
with T.sblock("matmul"):
vi_axis, vj_axis, vk_axis = T.axis.remap("SSR", [i, j, k])
with T.init():
c_buffer[vi_axis, vj_axis] = 0.0 # type: ignore
c_buffer[vi_axis, vj_axis] = (
c_buffer[vi_axis, vj_axis]
+ a_buffer[vi_axis, vk_axis] * b_buffer[vk_axis, vj_axis]
)
@tvm.testing.requires_hexagon
def test_builder_runner(hexagon_launcher):
"""Test builder and runner."""
if hexagon_launcher.is_simulator():
pytest.skip("Tuning on simulator not supported.")
mod = MatmulModule
max_workers = 4
builder = get_hexagon_local_builder(max_workers=max_workers)
runner = get_hexagon_rpc_runner(
hexagon_launcher, number=1, repeat=1, min_repeat_ms=0, max_workers=max_workers
)
(builder_result,) = builder.build([BuilderInput(mod, get_hexagon_target("v68"))])
assert builder_result.artifact_path is not None
assert builder_result.error_msg is None
runner_input = RunnerInput(
builder_result.artifact_path,
"llvm",
[
TensorInfo("float32", (MATMUL_N, MATMUL_N)),
TensorInfo("float32", (MATMUL_N, MATMUL_N)),
TensorInfo("float32", (MATMUL_N, MATMUL_N)),
],
)
# Run the module
(runner_future,) = runner.run([runner_input])
runner_result = runner_future.result()
assert runner_result.error_msg is None
for result in runner_result.run_secs:
if isinstance(result, FloatImm):
result = result.value
assert isinstance(result, float)
assert result >= 0.0
def dense_compute(m, n, k):
"""dense compute"""
X = te.placeholder((m, k), name="X", dtype="uint8")
packed_width = te.placeholder((n // 32, k // 4, 32, 4), name="packed_width", dtype="uint8")
axis_k = te.reduce_axis((0, k), name="k")
out = te.compute(
(m, n),
lambda i, j: te.sum(
X[i, axis_k].astype("int32")
* packed_width[
tvm.tir.indexdiv(j, 32), tvm.tir.indexdiv(axis_k, 4), j % 32, axis_k % 4
].astype("int32"),
axis=axis_k,
),
name="compute",
)
return [X, packed_width, out]
def schedule_dense(sch, block, m_size, do_tune):
"""dense schedule"""
a_y, a_x, _ = sch.get_loops(block)[-3:]
if do_tune:
y_factors = sch.sample_perfect_tile(a_y, n=2, max_innermost_factor=128)
a_yo, a_yi = sch.split(a_y, factors=y_factors)
else:
a_yo, a_yi = sch.split(a_y, factors=[None, min(m_size, 32)])
a_xo, a_xi = sch.split(a_x, factors=[None, 32])
sch.reorder(a_yo, a_xo, a_yi, a_xi)
a_xi, a_k = sch.get_loops(block)[-2:]
a_ko, a_ki = sch.split(a_k, factors=[None, 4])
sch.reorder(a_ko, a_xi, a_ki)
fused = sch.fuse(a_yo, a_xo)
sch.parallel(fused)
dec = sch.decompose_reduction(block, a_ko)
init_loop = sch.get_loops(dec)[-1]
sch.vectorize(init_loop)
sch.tensorize(a_xi, VRMPY_u8u8i32_INTRIN)
def verify_dense(sch, target, m_size, n_size, k_size, hexagon_session):
"""Verify dense operator."""
f = tvm.compile(sch.mod["main"], target=target)
mod = hexagon_session.load_module(f)
dev = hexagon_session.device
a_np = np.random.uniform(1, 10, size=(m_size, k_size)).astype("uint8")
b_np = np.random.uniform(1, 10, size=(n_size, k_size)).astype("uint8")
c_np = np.dot(a_np.astype("int32"), b_np.transpose().astype("int32"))
pack_width = np.random.uniform(1, 10, size=(n_size // 32, (k_size // 4), 32, 4)).astype("uint8")
for r_idx in range(n_size // 32):
for k_output in range(k_size // 4):
for s_idx in range(32):
for t_idx in range(4):
pack_width[r_idx][k_output][s_idx][t_idx] = b_np[r_idx * 32 + s_idx][
k_output * 4 + t_idx
]
a = tvm.runtime.tensor(a_np, dev)
b = tvm.runtime.tensor(pack_width, dev)
c = tvm.runtime.tensor(np.zeros((m_size, n_size), dtype="int32"), dev)
mod(a, b, c)
np.testing.assert_equal(c.numpy(), c_np)
evaluator = mod.time_evaluator(mod.entry_name, dev, number=10)
gflops = (n_size * m_size * k_size) * 2 / 1e9
time_ms = evaluator(a, b, c).mean * 1e3
print(f"{time_ms:f} ms, {gflops / (time_ms / 1e3):f} GOPS")
@tvm.testing.requires_hexagon
def test_vrmpy_dense(hexagon_launcher):
"""Test vector reduce muliply dense."""
if hexagon_launcher.is_simulator():
pytest.skip("Tuning on simulator not supported.")
do_tune = True
m_size, n_size, k_size = 128, 768, 768
workload = te.create_prim_func(dense_compute(m_size, n_size, k_size))
if not do_tune:
ir_module = tvm.IRModule({"main": workload})
sch = tvm.s_tir.Schedule(ir_module)
block = sch.get_sblock("compute")
schedule_dense(sch, block, m_size, do_tune)
else:
with tempfile.TemporaryDirectory() as work_dir:
def schedule_dense_for_tune(sch):
block = sch.get_sblock("compute")
return schedule_dense(sch, block, None, True)
target = get_hexagon_target("v69")
database = ms.tir_integration.tune_tir(
mod=workload,
target=target,
work_dir=work_dir,
max_trials_global=8,
space=ms.space_generator.ScheduleFn(
schedule_dense_for_tune,
sch_rules=[],
postprocs=[],
mutator_probs={},
),
strategy="replay-trace",
builder=get_hexagon_local_builder(),
runner=get_hexagon_rpc_runner(hexagon_launcher, number=10),
)
sch = ms.tir_integration.compile_tir(database, workload, target)
with hexagon_launcher.create_session() as session:
verify_dense(sch, get_hexagon_target("v68"), m_size, n_size, k_size, session)
# This is an example of a schedule found by vrmpy auto tensorization.
# It gets 440 GFLOPS on SD888.
@tvm.script.ir_module
class ModuleVRMPYAutoTensorize:
"""Vector Reduce Multimply auto tensorize test class."""
# pylint: disable=no-self-argument
@T.prim_func
def main( # type: ignore
X: T.Buffer((128, 768), "uint8"), # type: ignore
packed_width: T.Buffer((24, 192, 32, 4), "uint8"), # type: ignore
compute: T.Buffer((128, 768), "int32"), # type: ignore
) -> None:
# pylint: disable=missing-function-docstring
T.func_attr({"global_symbol": "main", "tir.noalias": True})
for i0_0_i1_0_0_fused in T.parallel(
512, annotations={"pragma_auto_unroll_max_step": 64, "pragma_unroll_explicit": 1}
):
for i0_1_init, i1_0_1_init, i0_2_init, i1_0_2_init in T.grid(2, 3, 1, 1):
with T.sblock("compute_o_init"):
i = T.axis.spatial(128, i0_0_i1_0_0_fused // 8 * 2 + i0_1_init + i0_2_init)
j_o = T.axis.spatial(24, i1_0_2_init + i0_0_i1_0_0_fused % 8 * 3 + i1_0_1_init)
T.reads()
T.writes(compute[i, j_o * 32 : j_o * 32 + 32]) # type: ignore
for i1_1 in T.vectorized(32):
with T.sblock("compute_init"):
j_i_init = T.axis.spatial(32, i1_1)
T.reads()
T.writes(compute[i, j_o * 32 + j_i_init])
compute[i, j_o * 32 + j_i_init] = 0 # type: ignore
for i2_0_0, i0_1, i1_0_1, i2_0_1, i0_2, i1_0_2 in T.grid(32, 2, 3, 6, 1, 1):
with T.sblock("compute_o_update"):
i = T.axis.spatial(128, i0_0_i1_0_0_fused // 8 * 2 + i0_1 + i0_2)
j_o = T.axis.spatial(24, i1_0_2 + i0_0_i1_0_0_fused % 8 * 3 + i1_0_1)
k_o = T.axis.reduce(192, i2_0_0 * 6 + i2_0_1)
T.reads(
compute[i, j_o * 32 : j_o * 32 + 32], # type: ignore
X[i, k_o * 4 : k_o * 4 + 4], # type: ignore
packed_width[j_o, k_o, 0:32, 0:4], # type: ignore
)
T.writes(compute[i, j_o * 32 : j_o * 32 + 32]) # type: ignore
a_buffer = T.match_buffer(
X[i, k_o * 4 : k_o * 4 + 4],
[4],
dtype="uint8",
offset_factor=1, # type: ignore
)
b_buffer = T.match_buffer(
packed_width[j_o, k_o, 0:32, 0:4], [32, 4], dtype="uint8", offset_factor=1
)
c_buffer = T.match_buffer(
compute[i, j_o * 32 : j_o * 32 + 32],
[32],
dtype="int32",
offset_factor=1, # type: ignore
)
a_u8x4: T.uint8x4 = a_buffer[0:4] # type: ignore
a_i32: T.int32 = T.reinterpret(a_u8x4, dtype="int32") # type: ignore
b_i32x32: T.int32x32 = T.reinterpret(b_buffer[0, 0:128], dtype="int32x32") # type: ignore
c_buffer[0:32] = T.call_llvm_pure_intrin( # type: ignore
4390, c_buffer[0:32], b_i32x32, a_i32, dtype="int32x32"
)
@tvm.testing.requires_hexagon
def test_vrmpy_dense_auto_tensorize(hexagon_launcher):
"""Test VRMPY dense operator."""
if hexagon_launcher.is_simulator():
pytest.skip("Tuning on simulator not supported.")
m_size, n_size, k_size = 128, 768, 768
workload = te.create_prim_func(dense_compute(m_size, n_size, k_size))
sch_rules = [
schedule_rule.MultiLevelTilingWithIntrin(
VRMPY_u8u8i32_INTRIN,
structure="SRSRS",
tile_binds=None,
max_innermost_factor=64,
vector_load_lens=None,
reuse_read=None,
reuse_write=schedule_rule.ReuseType(
req="may",
levels=[1, 2],
scope="global",
),
),
schedule_rule.ParallelizeVectorizeUnroll(
max_jobs_per_core=16,
max_vectorize_extent=128,
unroll_max_steps=[0, 16, 64, 512],
unroll_explicit=True,
),
]
postprocs = [
postproc.RewriteParallelVectorizeUnroll(),
postproc.RewriteReductionBlock(),
postproc.RewriteTensorize(vectorize_init_loop=True),
]
# Make this to False to compile and run the best tuned schedule
run_tuning = True
if run_tuning:
with tempfile.TemporaryDirectory() as work_dir:
target = get_hexagon_target("v68")
database = ms.tir_integration.tune_tir(
mod=workload,
target=target,
max_trials_global=8,
num_trials_per_iter=8,
work_dir=work_dir,
space=ms.space_generator.PostOrderApply(
f_block_filter=None,
sch_rules=sch_rules,
postprocs=postprocs,
mutator_probs={},
),
builder=get_hexagon_local_builder(),
runner=get_hexagon_rpc_runner(hexagon_launcher, number=10),
)
sch = ms.tir_integration.compile_tir(database, workload, target)
else:
sch = tvm.s_tir.Schedule(ModuleVRMPYAutoTensorize, debug_mask="all")
with hexagon_launcher.create_session() as session:
verify_dense(sch, get_hexagon_target("v68"), m_size, n_size, k_size, session)
if __name__ == "__main__":
tvm.testing.main()