| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| """ |
| Test the tuner |
| """ |
| import logging |
| import time |
| |
| import tvm |
| from tvm import te |
| |
| from tvm import autotvm |
| from tvm.autotvm.tuner import RandomTuner |
| |
| import tvm.testing |
| |
| |
| @autotvm.template("testing/conv2d_no_batching") |
| def conv2d_no_batching(N, H, W, CI, CO, KH, KW): |
| """An example template for testing""" |
| assert N == 1, "Only consider batch_size = 1 in this template" |
| |
| data = te.placeholder((N, CI, H, W), name="data") |
| kernel = te.placeholder((CO, CI, KH, KW), name="kernel") |
| |
| rc = te.reduce_axis((0, CI), name="rc") |
| ry = te.reduce_axis((0, KH), name="ry") |
| rx = te.reduce_axis((0, KW), name="rx") |
| |
| conv = te.compute( |
| (N, CO, H - KH + 1, W - KW + 1), |
| lambda nn, ff, yy, xx: te.sum( |
| data[nn, rc, yy + ry, xx + rx] * kernel[ff, rc, ry, rx], axis=[rc, ry, rx] |
| ), |
| tag="conv2d_nchw", |
| ) |
| |
| s = te.create_schedule([conv.op]) |
| |
| output = conv |
| OL = s.cache_write(conv, "local") |
| |
| # create cache stage |
| AA = s.cache_read(data, "shared", [OL]) |
| WW = s.cache_read(kernel, "shared", [OL]) |
| AL = s.cache_read(AA, "local", [OL]) |
| WL = s.cache_read(WW, "local", [OL]) |
| |
| # tile and bind spatial axes |
| n, f, y, x = s[output].op.axis |
| cfg = autotvm.get_config() |
| cfg.define_split("tile_f", cfg.axis(f), num_outputs=4) |
| cfg.define_split("tile_y", cfg.axis(y), num_outputs=4) |
| cfg.define_split("tile_x", cfg.axis(x), num_outputs=4) |
| bf, vf, tf, fi = cfg["tile_f"].apply(s, output, f) |
| by, vy, ty, yi = cfg["tile_y"].apply(s, output, y) |
| bx, vx, tx, xi = cfg["tile_x"].apply(s, output, x) |
| kernel_scope = n # this is the scope to attach global config inside this kernel |
| |
| s[output].bind(bf, te.thread_axis("blockIdx.z")) |
| s[output].bind(by, te.thread_axis("blockIdx.y")) |
| s[output].bind(bx, te.thread_axis("blockIdx.x")) |
| s[output].bind(vf, te.thread_axis("vthread")) |
| s[output].bind(vy, te.thread_axis("vthread")) |
| s[output].bind(vx, te.thread_axis("vthread")) |
| s[output].bind(tf, te.thread_axis("threadIdx.z")) |
| s[output].bind(ty, te.thread_axis("threadIdx.y")) |
| s[output].bind(tx, te.thread_axis("threadIdx.x")) |
| s[output].reorder(n, bf, by, bx, vf, vy, vx, tf, ty, tx, fi, yi, xi) |
| s[OL].compute_at(s[output], tx) |
| |
| # tile and bind reduction axes |
| n, f, y, x = s[OL].op.axis |
| rc, ry, rx = s[OL].op.reduce_axis |
| cfg.define_split("tile_rc", cfg.axis(rc), num_outputs=3) |
| cfg.define_split("tile_ry", cfg.axis(ry), num_outputs=3) |
| cfg.define_split("tile_rx", cfg.axis(rx), num_outputs=3) |
| rco, rcm, rci = cfg["tile_rc"].apply(s, OL, rc) |
| ryo, rym, ryi = cfg["tile_rx"].apply(s, OL, ry) |
| rxo, rxm, rxi = cfg["tile_ry"].apply(s, OL, rx) |
| s[OL].reorder(rco, ryo, rxo, rcm, rym, rxm, rci, ryi, rxi, n, f, y, x) |
| |
| s[AA].compute_at(s[OL], rxo) |
| s[WW].compute_at(s[OL], rxo) |
| s[AL].compute_at(s[OL], rxm) |
| s[WL].compute_at(s[OL], rxm) |
| |
| # cooperative fetching |
| for load in [AA, WW]: |
| n, f, y, x = s[load].op.axis |
| fused = s[load].fuse(n, f, y, x) |
| tz, fused = s[load].split(fused, nparts=cfg["tile_f"].size[2]) |
| ty, fused = s[load].split(fused, nparts=cfg["tile_y"].size[2]) |
| tx, fused = s[load].split(fused, nparts=cfg["tile_x"].size[2]) |
| s[load].bind(tz, te.thread_axis("threadIdx.z")) |
| s[load].bind(ty, te.thread_axis("threadIdx.y")) |
| s[load].bind(tx, te.thread_axis("threadIdx.x")) |
| |
| # tune unroll |
| cfg.define_knob("auto_unroll_max_step", [0, 512, 1500]) |
| cfg.define_knob("unroll_explicit", [0, 1]) |
| s[output].pragma(kernel_scope, "auto_unroll_max_step", cfg["auto_unroll_max_step"].val) |
| s[output].pragma(kernel_scope, "unroll_explicit", cfg["unroll_explicit"].val) |
| |
| return s, [data, kernel, conv] |
| |
| |
| def get_sample_task(target=tvm.target.cuda(), target_host=None): |
| """return a sample task for testing""" |
| task = autotvm.task.create( |
| "testing/conv2d_no_batching", |
| args=(1, 7, 7, 512, 512, 3, 3), |
| target=target, |
| target_host=target_host, |
| ) |
| return task, target |
| |
| |
| @tvm.testing.parametrize_targets("cuda", "opencl") |
| def test_tuning(target, ctx): |
| # init task |
| task, target = get_sample_task(target, None) |
| logging.info("%s", task.config_space) |
| |
| measure_option = autotvm.measure_option(autotvm.LocalBuilder(), autotvm.LocalRunner()) |
| |
| tuner = RandomTuner(task) |
| tuner.tune(n_trial=20, measure_option=measure_option) |
| |
| |
| if __name__ == "__main__": |
| # only print log when invoked from main |
| logging.basicConfig(level=logging.DEBUG) |
| |
| test_tuning() |