blob: 7c38c4ac4971b8127ea0aac3bad36cbef4d81142 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=invalid-name
# pylint: disable=too-many-nested-blocks
"""Device config class to hold information about the target hardware"""
from typing import Tuple, List, Dict, Optional
from functools import reduce
import math
import numpy as np
import tvm
from . import BlockConfig
from . import StripeConfig
from . import Propagator
def _round_up(a: int, b: int) -> int:
"""Round up to a multiple of b"""
return ((a + b - 1) // b) * b
def _round_up_div(a: int, b: int) -> int:
"""Divide by b and round up to a multiple of b"""
return (a + b - 1) // b
class _Shape:
"""Helper class for dealing with Tensor shapes of different layouts"""
def __init__(self, shape: List[int], layout="NHWC"):
if layout == "NHCWB16":
self.height = int(shape[1])
self.width = int(shape[3])
self.depth = int(shape[2]) * int(shape[4])
else:
# identity layout is NHWC but the shape is not always 4
length = len(shape)
if length == 4:
self.height = int(shape[1])
self.width = int(shape[2])
self.depth = int(shape[3])
elif length == 3:
self.height = int(shape[0])
self.width = int(shape[1])
self.depth = int(shape[2])
elif length == 2:
self.height = int(shape[0])
self.width = int(shape[1])
self.depth = 1
elif length == 1:
self.height = int(shape[0])
self.width = 1
self.depth = 1
def round_up(self, other: "_Shape"):
self.height = _round_up(self.height, other.height)
self.width = _round_up(self.width, other.width)
self.depth = _round_up(self.depth, other.depth)
def area(self) -> int:
return self.height * self.width
def as_list(self):
return [1, self.height, self.width, self.depth]
class EthosuDeviceConfig:
"""Arm(R) Ethos(TM)-U NPU config class"""
def __init__(self, device: str, disable_block_bulling: bool = False):
self._device = device
self._subkernel_limits = (8, 8)
self._output_cycles = (1, 2, 3, 4, 6)
self._split_depth = 16
self._max_block_shape = _Shape([1, 32, 64, 128])
self._bank_size_bytes = 1024
self._disable_block_culling = disable_block_bulling
if self._device == "ethos-u55-256":
self._micro_block = _Shape([1, 2, 2, 8])
self._input_micro_block = _Shape([1, 2, 2, 8])
self._delay_cycles = (2, 2)
self._activation_cycles = (0.25, 1)
self._output_units = 8
self._total_banks = 48
self._reserved_banks = 4
self._input_granularity = {1: 8, 2: 8, 4: 16}
self._accumulator_granularity = {4: 16, 5: 20}
self._lut_reserved = True
elif self._device == "ethos-u55-128":
self._micro_block = _Shape([1, 1, 2, 8])
self._input_micro_block = _Shape([1, 1, 2, 8])
self._delay_cycles = (2, 3)
self._activation_cycles = (0.5, 1)
self._output_units = 4
self._total_banks = 24
self._reserved_banks = 4
self._input_granularity = {1: 4, 2: 4, 4: 8}
self._accumulator_granularity = {4: 8, 5: 12}
self._lut_reserved = True
elif self._device == "ethos-u55-64":
self._micro_block = _Shape([1, 1, 1, 8])
self._input_micro_block = _Shape([1, 1, 1, 8])
self._delay_cycles = (2, 3)
self._activation_cycles = (1, 1)
self._output_units = 2
self._total_banks = 16
self._reserved_banks = 2
self._input_granularity = {1: 2, 2: 2, 4: 4}
self._accumulator_granularity = {4: 4, 5: 8}
self._lut_reserved = False
elif self._device == "ethos-u55-32":
self._micro_block = _Shape([1, 1, 1, 4])
self._input_micro_block = _Shape([1, 1, 1, 8])
self._delay_cycles = (3, 7)
self._activation_cycles = (1, 2)
self._output_units = 1
self._total_banks = 16
self._reserved_banks = 2
self._input_granularity = {1: 2, 2: 2, 4: 4}
self._accumulator_granularity = {4: 4, 5: 4}
self._lut_reserved = False
def _get_output_cycles(
self, op_type: str, op_str: str, ifm_dtype: str, ofm_dtype: str, activation: str
) -> float:
"""Estimate cycles per output element for an NPU operator
Parameters
----------
op_type : str
The NPU primitive operator
"ethosu_pooling"
op_str : str
The type of NPU operator.
"MAX"
ifm_dtype: str
Datatype of the Input Feature Map tensor (IFM)
ofm_dtype: str
Datatype of the Output Feature Map tensor (OFM)
activation : str
The activation function to use.
"NONE" - no activation function.
"CLIP" - clip the output between clip_min and clip_max.
"TANH" - tanh activation function.
"SIGMOID" - sigmoid activation function.
"LUT" - use a look-up table to perform the activation function.
Returns
-------
float
The cycles per output element
"""
cycles = 0
bw_limit = 0
if op_type == "ethosu_pooling" and op_str == "MAX":
cycles = self._output_cycles[0]
elif op_type in ("ethosu_pooling", "ethosu_conv2d", "ethosu_depthwise_conv2d"):
cycles = self._output_cycles[1] if ifm_dtype == "int8" else self._output_cycles[2]
elif op_type == "ethosu_binary_elementwise":
# Binary Bandwidth Limitations
if ifm_dtype == "int8":
bw_limit = 0.125 if ofm_dtype == "int8" else 0.75
elif ifm_dtype == "int16":
bw_limit = 0.75 if ofm_dtype == "int16" else 1
else:
bw_limit = 1.5
if op_str in ("MIN", "MAX"):
cycles = self._output_cycles[1]
elif op_str == "MUL":
cycles = self._output_cycles[2]
if op_str in ("ADD", "SUB"):
if ofm_dtype == "int32":
cycles = (
self._output_cycles[2] if ifm_dtype == "int32" else self._output_cycles[3]
)
else:
cycles = self._output_cycles[4]
elif op_type == "ethosu_unary_elementwise":
# Unary Bandwidth Limitations
if ifm_dtype == "int16":
bw_limit = 0.25
elif ifm_dtype == "int32":
bw_limit = 1
if op_str == "CLZ":
cycles = self._output_cycles[1]
elif op_str in ("SHL", "SHR"):
cycles = self._output_cycles[2]
elif op_str in ("LRELU", "ABS"):
cycles = self._output_cycles[1]
if ifm_dtype == "int16":
bw_limit = 0.5
act_cycles = 0
if activation == "CLIP":
act_cycles = self._activation_cycles[0]
elif activation in ("LUT", "TANH", "SIGMOID"):
act_cycles = self._activation_cycles[1]
return max((cycles / self._output_units), act_cycles, bw_limit)
def _get_delay_cycles(self, op_type: str, ifm_dtype: str) -> int:
"""Get the number of delay cycles during a bubble
Parameters
----------
op_type : str
The NPU primitive operator
"ethosu_pooling"
op_str : str
The type of NPU operator.
"MAX"
ifm_dtype: str
Datatype of the Input Feature Map tensor (IFM)
Returns
----------
int
The amount of delay cycles
"""
if op_type in ("ethosu_conv2d", "ethosu_depthwise2d", "ethosu_pooling"):
if ifm_dtype == "int16":
return self._delay_cycles[1]
return self._delay_cycles[0]
return 0
def _get_weight_decoder_cycles(self, op_type: str) -> int:
"""Get cycle estimate for weight decoding
Parameters
----------
op_type: str
The NPU primitive operator
"ethosu_pooling"
Returns
----------
int
Estimated cycles for weight decoding
"""
if op_type in ("ethosu_conv2d", "ethosu_depthwise2d"):
return 32 * self._micro_block.depth // 8
return 0
def get_output_quantum(self, ofm_layout: str) -> Tuple[int]:
"""Get the atomic output volume
Parameters
----------
ofm_layout : str
The layout of the Output Feature Map tensor. Can be "NHWC" or "NHCWB16".
Returns
----------
Tuple[int]
The atomic output volume formatted to the ofm_layout parameter
"""
if ofm_layout == "NHCWB16":
return [
1,
self._micro_block.height,
1,
self._micro_block.width,
self._micro_block.depth,
]
return self._micro_block.as_list()
def _align(self, x: int, n: int) -> int:
return int(math.ceil(x / n) * n)
def _get_input_size(
self, output_size: int, kernel_stride: int, border: int, upscaling_factor: int
) -> int:
return int(math.ceil(((output_size - 1) * kernel_stride + border)) / upscaling_factor)
def _get_dilated_kernel_size(self, kernel_size: int, dilation: int) -> int:
return (kernel_size - 1) * dilation + 1
def _get_input_block(
self,
output_block: _Shape,
input_shape: _Shape,
dtype: str,
op_type: str,
partkernel: bool,
stride_h: int,
stride_w: int,
dilated_kernel_h: int,
dilated_kernel_w: int,
upscaling_factor: int,
) -> _Shape:
height = self._get_input_size(
output_block.height,
stride_h,
min(dilated_kernel_h, self._subkernel_limits[0]),
upscaling_factor,
)
width = self._get_input_size(
output_block.width,
stride_w,
min(dilated_kernel_w, self._subkernel_limits[1]),
upscaling_factor,
)
if op_type == "ethosu_conv2d":
if dtype == "int8":
if partkernel:
depth = self._align(min(32, input_shape.depth), 8)
else:
depth = self._align(min(16, input_shape.depth), 8)
elif dtype == "int16":
depth = self._align(min(16, input_shape.depth), 4)
else:
depth = self._align(min(8, input_shape.depth), 2)
else:
depth = output_block.depth
return _Shape(
[
1,
self._align(height, self._micro_block.height),
self._align(width, self._micro_block.width),
depth,
]
)
def get_kernel_steps(
self,
op_type: str,
dilated_kernel_h: int,
dilated_kernel_w: int,
ifm_dtype: str,
partkernel: bool = False,
) -> List[int]:
"""Calculate the total number of subkernels and their sizes
Parameters
----------
op_type : str
The NPU primitive operator
"ethosu_pooling"
dilated_kernel_h: int
Height of dilated kernel
dilated_kernel_w: int
Width of dilated kernel
ifm_dtype: str
Datatype of the Input Feature Map tensor (IFM)
partkernel: bool
Flag showing whether part-kernel first traversal is used
Returns
----------
List[int]
List where each entry contains the amount of elements in one of the subkernels
"""
if op_type == "ethosu_binary_elementwise":
return [1]
subkernels = self._get_subkernels(dilated_kernel_h, dilated_kernel_w)
# Determine the number of kernel steps per subkernel
kernel_steps = []
for y, x in subkernels:
subkernel_elements = x * y
if op_type == "ethosu_conv2d" and partkernel:
# Part-kernel-first traversal conv2d
divisor = 4 if ifm_dtype == "int8" else 2
kernel_steps.append(int(_round_up_div(subkernel_elements, divisor)))
elif op_type == "ethosu_depthwise_conv2d":
kernel_steps.append(int(_round_up_div(subkernel_elements, 4)))
else:
# Depth-first traversal conv2d or pooling
kernel_steps.append(int(subkernel_elements))
return kernel_steps
def _get_subkernels(self, dilated_kernel_h: int, dilated_kernel_w: int):
num_subkernels_y = _round_up_div(dilated_kernel_h, self._subkernel_limits[0])
num_subkernels_x = _round_up_div(dilated_kernel_w, self._subkernel_limits[1])
subkernels_y = [
min((dilated_kernel_h - i * self._subkernel_limits[0]), self._subkernel_limits[0])
for i in range(num_subkernels_y)
]
subkernels_x = [
min((dilated_kernel_w - i * self._subkernel_limits[1]), self._subkernel_limits[1])
for i in range(num_subkernels_x)
]
subkernels = []
for y in subkernels_y:
for x in subkernels_x:
subkernels.append((y, x))
return subkernels
def _get_accumulator_width(self, op_type: str, ifm_dtype: str):
if ifm_dtype == "int16" and op_type != "ethosu_pooling":
return 5
return 4
def is_partkernel(
self, op_type: str, ifm_channels: int, ifm_dtype: str, kernel_elements: int
) -> bool:
"""Determine which block traversal strategy has better DPU utilization
Parameters
----------
op_type: str
The NPU primitive operator
"ethosu_pooling"
ifm_channels: int
Number of input channels
ifm_dtype: str
Datatype of the Input Feature Map tensor (IFM)
kernel_elements: int
Total number of elements in the kernel
Returns
----------
bool
True if partkernel first has best DPU utilization
"""
if op_type != "ethosu_conv2d":
return False
depth_first_utilization = ifm_channels / _round_up(
ifm_channels, 32 if ifm_dtype == "int8" else 16
)
part_kernel_first_utilization = (ifm_channels / _round_up(ifm_channels, 8)) * (
kernel_elements / _round_up(kernel_elements, 4 if ifm_dtype == "int8" else 2)
)
return part_kernel_first_utilization > depth_first_utilization or ifm_channels <= 8
def _get_input_banks(self, input_block_shape, input_bytewidth):
input_bytes = input_block_shape.area() * self._align(
input_block_shape.depth * input_bytewidth, 8
)
input_banks = _round_up_div(input_bytes, self._bank_size_bytes) * 2
input_banks = _round_up(input_banks, self._input_granularity[input_bytewidth])
return input_banks
def _get_accumulator_banks(self, output_block_shape, acc_bytewidth):
acc_depth = _round_up(output_block_shape.depth, 8)
acc_bytes = output_block_shape.area() * self._align(acc_depth, 8) * acc_bytewidth
acc_banks = _round_up_div(acc_bytes, self._bank_size_bytes) * 2
acc_banks = _round_up(acc_banks, self._accumulator_granularity[acc_bytewidth])
return acc_banks
@staticmethod
def _create_layout_block(nhwc_block_config, layout):
"""A helper function to convert to brick layout"""
if layout == "NHCWB16":
return [
nhwc_block_config[0],
nhwc_block_config[1],
1 + ((nhwc_block_config[3] - 1) // 16),
nhwc_block_config[2],
16,
]
# else it could only be NHWC
return nhwc_block_config
def get_elementwise_block_config(
self,
ifm_propagator: Propagator,
ifm2_propagator: Optional[Propagator],
op_attrs: Dict,
ofm_shape: List[int],
output_layout: str,
input_layout: str,
input2_layout: Optional[str],
ifm_dtype: str,
ofm_dtype: str,
) -> List[BlockConfig]:
"""Get a suitable block config for an elementwise operator
Parameters
----------
ifm_propagator: Propagator,
The propagator containing the data dependencies between input and output
ifm2_propagator: Propagator,
The propagator containing the data dependencies between input2 and output
op_attrs: Dict,
Dictionary containing operator attributes
ofm_shape: List[int],
Shape of the output tensor
output_layout: str,
The layout of the Output Feature Map tensor. Can be "NHWC" or "NHCWB16".
input_layout: str,
The layout of the Input Feature Map tensor. Can be "NHWC" or "NHCWB16".
input2_layout: str,
The layout of the Input2 Feature Map tensor. Can be "NHWC" or "NHCWB16".
ifm_dtype: str,
Datatype of the Input Feature Map tensor (IFM)
ofm_dtype: str,
Datatype of the Output Feature Map tensor (OFM)
Returns
----------
List[BlockConfig]
List containing a single suitable block config
"""
block_config = []
output_shape = [int(a) for a in ofm_shape]
op_type = op_attrs.get("op")
op_str = op_attrs.get("op_str")
activation = op_attrs.get("activation", "NONE")
input_bytewidth = 1 if ifm_dtype == "int8" else 2 if ifm_dtype == "int16" else 4
banks_available = self._total_banks - self._reserved_banks
if activation == "LUT" and not self._lut_reserved:
banks_available -= 2
# Handle user-forced block config
options = tvm.transform.PassContext.current().config.get("relay.ext.ethos-u.options", None)
if options and options.dev_force_block_config:
block_config = [int(v) for v in options.dev_force_block_config.split("x")]
assert len(block_config) == 3
if output_layout == "NHWC":
block_shape = [output_shape[0], block_config[0], block_config[1], block_config[2]]
else:
block_shape = [
output_shape[0],
block_config[0],
1 + ((block_config[2] - 1) // 16),
block_config[1],
16,
]
output_cycles = self._get_output_cycles(
op_type, op_str, ifm_dtype, ofm_dtype, activation
)
output_cycles *= reduce(lambda a, b: a * b, block_shape, 1)
output_cycles = int(math.ceil(output_cycles))
return [BlockConfig(block_shape, block_shape, 0, output_cycles)]
# Split the block in half until it fits into SHRAM
max_height, max_width, max_depth = self._max_block_shape.as_list()[1:]
if output_layout == "NHCWB16":
output_height = output_shape[1]
output_width = output_shape[3]
output_channels = output_shape[2] * 16
else:
output_height = output_shape[1]
output_width = output_shape[2]
output_channels = output_shape[3]
output_nhwc_block = [
1,
_round_up(min(output_height, max_height), self._micro_block.height),
_round_up(min(output_width, max_width), self._micro_block.width),
_round_up(min(output_channels, max_depth), self._micro_block.depth),
]
output_block = self._create_layout_block(output_nhwc_block, output_layout)
split_order = (a for a in [1, 2, 3])
split_axis = next(split_order)
offset = [0] * len(output_block)
stripes = [1] * len(output_block)
order = [1, 2, 4, 3, 0] if output_layout == "NHCWB16" else [1, 2, 3, 4]
while True:
# Create stripe config for output block
output_stripe_config = StripeConfig(
output_block, output_block, output_block, order, stripes, offset
)
# Propagate the output to obtain the two input blocks
input_block = _Shape(ifm_propagator.propagate(output_stripe_config).shape, input_layout)
if ifm2_propagator:
input2_block = _Shape(
ifm2_propagator.propagate(output_stripe_config).shape, input2_layout
)
else:
# Unary elementwise
input2_block = input_block
input_block.round_up(self._input_micro_block)
input2_block.round_up(self._input_micro_block)
# Banks required for input block
input_banks = self._get_input_banks(input_block, input_bytewidth)
# Banks required for input2 block
input2_banks = self._get_input_banks(input2_block, input_bytewidth)
# Check whether or not both IFMs fit into SHRAM
if (input_banks + input2_banks) <= banks_available:
output_cycles = self._get_output_cycles(
op_type, op_str, ifm_dtype, ofm_dtype, activation
)
output_cycles *= reduce(lambda a, b: a * b, output_block, 1)
output_cycles = int(math.ceil(output_cycles))
block_config.append(
BlockConfig(input_block.as_list(), output_block, 0, output_cycles)
)
break
if output_nhwc_block[split_axis] == self._micro_block.as_list()[split_axis]:
split_axis = next(split_order)
output_nhwc_block[split_axis] = _round_up(
_round_up_div(output_nhwc_block[split_axis], 2),
self._micro_block.as_list()[split_axis],
)
output_block = self._create_layout_block(output_nhwc_block, output_layout)
return block_config
def _get_subkernel_propagator(
self, op_attrs, ifm_propagator, input_layout, output_layout, depth
):
op_type = op_attrs.get("op")
stride_h = int(op_attrs.get("stride_h", 1))
stride_w = int(op_attrs.get("stride_w", 1))
transform = ifm_propagator.transform
if op_type != "ethosu_identity":
if input_layout == "NHCWB16":
transform[1][-1] = min(transform[1][-1], self._subkernel_limits[0] - stride_h)
transform[3][-1] = min(transform[3][-1], self._subkernel_limits[1] - stride_w)
else:
transform[1][-1] = min(transform[1][-1], self._subkernel_limits[0] - stride_h)
transform[2][-1] = min(transform[2][-1], self._subkernel_limits[1] - stride_w)
if op_type in ("ethosu_pooling", "ethosu_depthwise_conv2d"):
if output_layout == "NHCWB16" and input_layout == "NHWC":
transform[3][-1] = depth
elif output_layout == "NHCWB16" and input_layout == "NHCWB16":
transform[2][-1] = 1 + ((depth - 1) // 16)
return Propagator(transform, ifm_propagator.offset)
def get_valid_block_configs(
self,
ifm_propagator: Propagator,
op_attrs: Dict,
ofm_shape: List[int],
ofm_channels: int,
ifm_channels: int,
output_layout: str,
input_layout: str,
ifm_dtype: str,
ofm_dtype: str,
kernel_h: int = 1,
kernel_w: int = 1,
) -> List[BlockConfig]:
"""Get all of the valid block configs
Parameters
----------
ifm_propagator: Propagator,
The propagator containing the data dependencies between input and output
op_attrs: Dict,
Dictionary containing operator attributes
ofm_shape: List[int],
Shape of the output tensor
ofm_channels: int,
Number of output channels
ifm_channels: int,
Number of input channels
output_layout: str,
The layout of the Output Feature Map tensor. Can be "NHWC" or "NHCWB16".
input_layout: str,
The layout of the Input Feature Map tensor. Can be "NHWC" or "NHCWB16".
ifm_dtype: str,
Datatype of the Input Feature Map tensor (IFM)
ofm_dtype: str,
Datatype of the Output Feature Map tensor (OFM)
kernel_h: int,
Height of kernel
kernel_h: int
Width of kernel
Returns
----------
List[BlockConfig]
List containing all of the valid block configs
"""
valid_block_configs = []
op_type = op_attrs.get("op")
op_str = op_attrs.get("op_str")
activation = op_attrs.get("activation", "NONE")
upscaling_factor = 1 if op_attrs.get("upscale", "NONE") == "NONE" else 2
if output_layout == "NHCWB16":
output_shape = _Shape([1, ofm_shape[1], ofm_shape[3], ofm_channels])
else:
output_shape = _Shape(ofm_shape)
# Define search space
max_height = min(output_shape.height, self._max_block_shape.height)
min_height = max(self._micro_block.height, upscaling_factor)
max_width = min(output_shape.width, self._max_block_shape.width)
min_width = max(self._micro_block.width, upscaling_factor)
max_depth = min(ofm_channels, self._max_block_shape.depth)
min_depth = max(self._micro_block.depth, upscaling_factor)
heights = range(min_height, max_height + min_height, min_height)
widths = range(min_width, max_width + min_width, min_width)
depths = range(min_depth, max_depth + min_depth, min_depth)
# Handle user-forced block config
options = tvm.transform.PassContext.current().config.get("relay.ext.ethos-u.options", None)
forced = False
if options and options.dev_force_block_config:
block_config = [int(v) for v in options.dev_force_block_config.split("x")]
assert len(block_config) == 3
heights = [block_config[0]]
widths = [block_config[1]]
depths = [block_config[2]]
forced = True
input_bytewidth = 1 if ifm_dtype == "int8" else 2
acc_bytewidth = self._get_accumulator_width(op_type, ifm_dtype)
banks_available = self._total_banks - self._reserved_banks
if activation == "LUT" and not self._lut_reserved:
banks_available -= 2
# Input block depth has additional limitations for operators that require full input depth
input_block_depth = 0
partkernel = self.is_partkernel(op_type, ifm_channels, ifm_dtype, kernel_h * kernel_w)
if op_type == "ethosu_conv2d":
if partkernel:
input_block_depth = min(ifm_channels, 16)
else:
input_block_depth = min(ifm_channels, 32)
for depth in reversed(depths):
if (depth < output_shape.depth) and (depth % self._split_depth != 0) and not forced:
# Block depth has to be less than full depth or a multiple of the split depth
continue
subkernel_propagator = self._get_subkernel_propagator(
op_attrs, ifm_propagator, input_layout, output_layout, depth
)
for width in reversed(widths):
for height in reversed(heights):
if output_layout == "NHCWB16":
output_block = (
1,
height,
1 + ((depth - 1) // 16),
width,
16,
)
order = [1, 2, 4, 3, 0]
else:
output_block = (1, height, width, depth)
order = [1, 2, 3, 4]
offset = [0] * len(output_block)
stripes = [1] * len(output_block)
block_stripe_config = StripeConfig(
output_block,
output_block,
output_block,
order,
stripes,
offset,
)
# Propagate output block
input_block = subkernel_propagator.propagate(block_stripe_config)
input_block_shape = _Shape(input_block.shape, input_layout)
input_block_shape.round_up(self._input_micro_block)
output_block_shape = _Shape(output_block, output_layout)
if op_type == "ethosu_conv2d":
input_block_shape.depth = input_block_depth
# Banks required for input block
input_banks = self._get_input_banks(input_block_shape, input_bytewidth)
# Banks required for accumulation
acc_banks = self._get_accumulator_banks(output_block_shape, acc_bytewidth)
if (input_banks + acc_banks) <= banks_available:
output_cycles = self._get_output_cycles(
op_type, op_str, ifm_dtype, ofm_dtype, activation
)
output_cycles *= np.prod(output_block).tolist()
output_cycles = int(math.ceil(output_cycles))
compute_cycles = self._estimate_compute_cycles_per_block(
op_type,
output_block_shape,
input_block_shape,
kernel_h,
kernel_w,
ifm_channels,
"int8",
partkernel,
)
block_config = BlockConfig(
input_block_shape.as_list(), output_block, compute_cycles, output_cycles
)
if self._disable_block_culling:
# Block culling disabled - add all block configs that fit
valid_block_configs.append(block_config)
else:
# Add block config only if it's not dominated by an existing block.
# A block config is dominated by another if its output_shape is greater
# or equal in every dimension and strictly greater in at least one
# dimension.
dominated = False
for valid_block in valid_block_configs:
if block_config < valid_block:
dominated = True
break
if not dominated:
valid_block_configs.append(block_config)
# Every consecutive block in the innermost loop will be dominated by
# this one so break
break
return valid_block_configs
def _estimate_compute_cycles_per_block(
self,
op_type: str,
block_shape: _Shape,
input_block_shape: _Shape,
kernel_h: int,
kernel_w: int,
input_channels: int,
ifm_dtype: str,
partkernel: bool = False,
) -> Tuple[int, int]:
# Calculate the amount of micro blocks per block, per axis
num_quantum_x = _round_up_div(block_shape.width, self._micro_block.width)
num_quantum_y = _round_up_div(block_shape.height, self._micro_block.height)
num_quantum_z = _round_up_div(block_shape.depth, self._micro_block.depth)
num_quantum_xy = num_quantum_x * num_quantum_y
kernel_steps = self.get_kernel_steps(op_type, kernel_h, kernel_w, ifm_dtype, partkernel)
wd_cycles = self._get_weight_decoder_cycles(op_type)
delay_cycles = self._get_delay_cycles(op_type, ifm_dtype)
cycle_quantum = 4
compute_cycles = 0
for subkernel_steps in kernel_steps:
subkernel_cycles = 1 if op_type == "ethosu_pooling" else subkernel_steps
compute_cycles += (
max(wd_cycles, cycle_quantum * num_quantum_xy) * subkernel_cycles * num_quantum_z
)
if num_quantum_xy == 1:
if num_quantum_z == 1:
compute_cycles += delay_cycles * subkernel_steps
elif subkernel_steps > 1:
compute_cycles += delay_cycles * (subkernel_steps - 1) * num_quantum_z
if partkernel:
compute_cycles *= _round_up_div(input_block_shape.depth, 8)
if op_type == "ethosu_conv2d":
compute_cycles *= _round_up_div(input_channels, input_block_shape.depth)
return compute_cycles