blob: 75a76f02ab40e3e098aaa55ae2c6e5397759949a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import tvm
import tvm.testing
from tvm import te
import numpy as np
from tvm.contrib import mps
@tvm.testing.requires_metal
def test_matmul():
n = 1024
l = 128
m = 256
A = te.placeholder((n, l), name="A")
B = te.placeholder((l, m), name="B")
C = mps.matmul(A, B)
D = te.compute(C.shape, lambda *i: C(*i) + 1.0)
s = te.create_schedule(D.op)
yo, xo = D.op.axis
block_y = te.thread_axis("blockIdx.y")
block_x = te.thread_axis("blockIdx.x")
thread_y = te.thread_axis("threadIdx.y")
thread_x = te.thread_axis("threadIdx.x")
by, ty = s[D].split(yo, factor=16)
bx, tx = s[D].split(xo, factor=16)
s[D].bind(by, block_y)
s[D].bind(bx, block_x)
s[D].bind(ty, thread_y)
s[D].bind(tx, thread_x)
def verify(A, B, D, s, target="metal"):
if not tvm.get_global_func("tvm.contrib.mps.matmul", True):
print("skip because extern function is not available")
return
ctx = tvm.metal(0)
f = tvm.build(s, [A, B, D], "metal")
a = tvm.nd.array(np.random.uniform(size=(n, l)).astype(A.dtype), ctx)
b = tvm.nd.array(np.random.uniform(size=(l, m)).astype(B.dtype), ctx)
c = tvm.nd.array(np.zeros((n, m), dtype=C.dtype), ctx)
f(a, b, c)
tvm.testing.assert_allclose(c.asnumpy(), np.dot(a.asnumpy(), b.asnumpy()) + 1, rtol=1e-5)
verify(A, B, D, s)
@tvm.testing.requires_metal
def test_conv2d():
n = 1
h = 14
w = 14
ci = 2
co = 4
kh = 3
kw = 3
stride = 2
A = te.placeholder((n, h, w, ci), name="x")
B = te.placeholder((co, kh, kw, ci), name="w")
C = mps.conv2d(A, B, "SAME", 2)
s1 = te.create_schedule(C.op)
def verify(A, B, C, target="llvm"):
if not tvm.get_global_func("tvm.contrib.mps.conv2d", True):
print("skip because extern function is not available")
return
ctx = tvm.metal(0)
f = tvm.build(s1, [A, B, C], "metal")
a = tvm.nd.array(np.random.uniform(size=(n, h, w, ci)).astype(A.dtype), ctx)
b = tvm.nd.array(np.random.uniform(size=(co, kh, kw, ci)).astype(B.dtype), ctx)
c = tvm.nd.array(np.zeros((n, h // stride, w // stride, co), dtype=C.dtype), ctx)
f(a, b, c)
# print(c.asnumpy())
# print(c.shape)
verify(A, B, C, s1)
if __name__ == "__main__":
# test_matmul()
test_conv2d()