blob: 37565f0b9eb9b9ef4b1e0e2b6df877062797786a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Pipeline executor that executes a series of modules in a pipeline fashion."""
import json
import os
import tvm._ffi
from tvm import relay
from tvm.relay.transform import InferType
from tvm.contrib.pipeline_executor import PipelineExecutorFactoryModule
def pipeline_executor_build_enabled():
"""Check if the pipeline executor build is enabled.
Return
-------
enable: bool
Return whether the pipeline executor is enabled.
"""
return tvm.contrib.pipeline_executor.pipeline_executor_enabled()
def build(pipe_configs):
"""Build modules used in the pipeline executor, then use these modules and configuration
to create a pipeline executor.
Parameters
----------
pipe_configs: PipelineConfig
Build Configuration information.
Returns
-------
ret: PipelineExecutorFactoryModule
Common interface for pipeline executor factory modules.
"""
libs = {}
config = pipe_configs.get_config()
if "module_connection" not in config:
raise RuntimeError('"module_connection" is missing')
if "input_connection" not in config:
raise RuntimeError('"input_connection" is missing')
if "param_connection" not in config:
raise RuntimeError('"param_connection" is missing')
mod_n_configs = config["module_connection"]
config_len = len(mod_n_configs)
module_string_config = [{} for _ in range(config_len)]
# Use hardware configurations to build backend modules for each subgraph.
for ir_mod, mod_config in mod_n_configs.items():
pipe_config = mod_config["pipeline"].copy()
mod_idx = pipe_config["mod_idx"]
dev = mod_config["dev"]
target = mod_config["target"]
build_func = relay.build
# Callers may need to use a customized building function to wrap the pre-building logic
# and the backend building logic. For example, in order to support a backend which only
# can do "int8" computation, the caller may need to merge the "quantization" logic
# into the building logic to creat a customized building function.
if "build" in mod_config and mod_config["build"]:
build_func = mod_config["build"]
lib = build_func(
ir_mod,
target,
params=mod_config["params"],
target_host=mod_config["target_host"],
mod_name=mod_config["mod_name"],
)
pipe_config["dev"] = "{},{}".format(dev.device_type, dev.device_id)
# Use "mod_idx" as the key to create a "module_connection" map which is not only
# for the module index but also for the module connection used to build the pipeline.
module_string_config[mod_idx] = pipe_config
libs[mod_idx] = {
"lib": lib,
"dev": dev,
"fcompile": mod_config["fcompile"],
"export_cc": mod_config["export_cc"],
}
# Creating a text form configuration to record the "input_connection" and the
# "module_connection" information. The "input_connection" is used to record the
# map of global input and subgraph input, and the "module_connection" is used to
# record module dependency.
string_config = {}
string_config["param_connection"] = config["param_connection"]
string_config["input_connection"] = config["input_connection"]
string_config["module_connection"] = module_string_config
return PipelineExecutorFactoryModule(libs, string_config)
def export_library(factory, directory_path):
"""Export the pipeline executor into disk files.
Parameters
----------
factory : PipelineExecutorFactoryModule
The pipeline executor factory
directory_path : str
Export the files to this directory.
"""
if not factory.pipeline_mods:
raise RuntimeError("The pipeline executor has not been initialized.")
# Check if the directory_path exists.
if not directory_path or not os.path.exists(directory_path):
raise RuntimeError("The directory {directory_path} does not exist.")
# Create an load configuration.
load_config_file_name = "{}/load_config".format(directory_path)
pipeline_config_file_name = "{}/pipeline_config".format(directory_path)
config = {}
config["load_config"] = load_config_file_name
config["pipeline_config"] = pipeline_config_file_name
load_config = []
# Export the library, JSON, and parameter into files, then export these files path
# into a configuration file.
for lib_index in factory.pipeline_mods:
mconfig = {}
mconfig["mod_idx"] = lib_index
mconfig["lib_name"] = "{}/lib{}.so".format(directory_path, lib_index)
mconfig["json_name"] = "{}/json{}".format(directory_path, lib_index)
mconfig["params_name"] = "{}/params{}".format(directory_path, lib_index)
lib_config = factory.pipeline_mods[lib_index]
mconfig["dev"] = "{},{}".format(lib_config["dev"].device_type, lib_config["dev"].device_id)
fcompile = lib_config["fcompile"]
if not fcompile:
fcompile = False
# Get the graph, lib, and parameters from GraphExecutorFactoryModule.
lib = factory.pipeline_mods[lib_index]["lib"]
# Export the lib, graph, and parameters to disk.
lib.export_library(mconfig["lib_name"], fcompile)
with open(mconfig["json_name"], "w") as file_handle:
file_handle.write(lib.graph_json)
with open(mconfig["params_name"], "wb") as file_handle:
file_handle.write(relay.save_param_dict(lib.params))
load_config.append(mconfig)
with open(load_config_file_name, "w") as file_handle:
json.dump(load_config, file_handle)
with open(pipeline_config_file_name, "w") as file_handle:
json.dump(factory.mods_config, file_handle)
config_file_name = "{}/config".format(directory_path)
with open(config_file_name, "w") as file_handle:
json.dump(config, file_handle)
return config_file_name
class PipelineConfig(object):
"""Pipeline configuration information, this class contains the DAG that expresses
the dependency of each module involved in a pipeline and the parameters for building
each module.
"""
class Binding:
"""This class defines the module connections information.
The type can only be "input" or "output".
Parameters
----------
owner : ModuleWrapper
The class who owns this interface.
io_type : str
The I/O type of this interface. It can only be "input" or "output".
name : str/integer
Name, for input it is string such as "data0", for output it is an integer such as 0.
data_type: TensorType
The data type of this interface.
"""
def __init__(self, owner, io_type, name, data_type=None):
self.io_owner = owner
self.io_type = io_type
self.name = str(name)
# Child interfaces that depend on this interface.
self.bindings = []
# Parents interfaces that this interface depend on.
self.parents = []
self.data_type = data_type
def get_name(self):
# Return name of this interface and the name of owner who owns this interface.
owner_name = ""
if isinstance(self.io_owner, PipelineConfig.ModuleWrapper):
owner_name = self.io_owner.name
return owner_name, self.name
def get_owner_idx(self):
# If the owner is ModuleWrapper return the owner index, if not return 0.
if isinstance(self.io_owner, PipelineConfig.ModuleWrapper):
return self.io_owner.idx
return -1
def is_pipeline_executor_interface(self):
"""The pipeline interface is used to interact with the caller. There are two types
of interfaces, one is 'input' another is 'output'. The pipeline input interface
is responsible for passing parameters to the internal module interface, and the
pipeline output interface is responsible for outputting the results computed by
the pipeline executor to the caller.
"""
return not isinstance(self.io_owner, PipelineConfig.ModuleWrapper)
def __repr__(self):
# Geting the binding information in the form of text.
str_format = " |{}: ".format(self.name)
for binding in self.bindings:
mname, dname = binding.get_name()
str_format += "{0}:{1} ".format(mname, dname)
return str_format
def check_binding_dict(self, connection_dict):
"""Checking the binding dictionary.
Parameter
---------
connection_dict : Dict[str, Any]
It is a dictionary of module connections.
"""
if "interface_name" not in connection_dict:
raise RuntimeError('"inteface_name" is missing in global config!"')
if "connection" not in connection_dict:
raise RuntimeError(f'"connection" is missing!"')
# The global interface mapping should be one-to-one.
if not connection_dict["connection"]:
raise RuntimeError("The global interface map is empty!")
if len(connection_dict["connection"]) > 1:
raise RuntimeError("A global interface maps multiple module interfaces!")
if "mod_idx" not in connection_dict["connection"][0]:
raise RuntimeError('"mod_idx" is missing!')
def get_binding_dict(self):
"""Returning the binding information in the form of dictionary.
Returns
-------
data : Dict[str, Any]
The binding information is in the form of dictionary.
"""
dict_format = {"interface_name": self.name, "connection": []}
for binding in self.bindings:
_, dname = binding.get_name()
midx = binding.get_owner_idx()
dict_format["connection"].append({"mod_idx": midx, "interface_name": dname})
self.check_binding_dict(dict_format)
return dict_format
def check_dag_acyclic(self, start, inputs):
"""This is to check whether the DAG containing these input interfaces is acyclic.
Parameters
----------
start: ModuleWrapper
The starting node of the cycle check algorithm.
inputs: Binding
These interfaces are used to connect to each other to build DAG.
Return
------
Return true if there is no cycle in the DAG.
"""
for binding in inputs.values():
if start == binding.io_owner:
return False
for p in binding.parents:
if not self.check_dag_acyclic(start, p.io_owner.input_bindings.bindings):
return False
return True
def connect(self, binding):
"""Connect the current interface to the destination interface.
Correct connections are as follows: 1. the pipeline input connected to a module input,
2. the module output connected to a pipeline output, 3. the module output connected to
a module input.
Parameters
----------
binding: Binding
The destination of this connection.
"""
# Check whether the binding setting is correct or not.
if self.io_owner == binding.io_owner:
raise RuntimeError("Can not bind itself.")
if self.io_type == "param" and not self.is_pipeline_executor_interface():
raise RuntimeError(
'The "param" binding can only be used by a pipeline executor interface!'
)
if not self.is_pipeline_executor_interface() and self.io_type == "input":
raise RuntimeError("Module can only bind from output interface!")
if self.io_type == "param" and binding.io_type != "param":
raise RuntimeError(
'A global "param" interface can only be bind with a module "param" interface!'
)
if (
not self.is_pipeline_executor_interface()
and not binding.is_pipeline_executor_interface()
and binding.io_type == "output"
):
raise RuntimeError("Can not bind module output with another module output!")
if (
not self.is_pipeline_executor_interface()
and binding.is_pipeline_executor_interface()
and binding.io_type == "input"
):
raise RuntimeError("Can not bind module output with pipeline input!")
if self.is_pipeline_executor_interface() and self.io_type == "output":
raise RuntimeError("Global output can not be used as binding start point.")
if (
self.is_pipeline_executor_interface()
and self.io_type == "input"
and binding.io_type != "input"
):
raise RuntimeError("Global input can only bind with module input.")
self.bindings.append(binding)
if not self.is_pipeline_executor_interface():
# Check whether the data types of the source and destination are the same.
if (
isinstance(binding.io_owner, PipelineConfig.ModuleWrapper)
and self.data_type != binding.data_type
):
raise RuntimeError(
f"Illegal type (%s vs. %s): binding type is not same!"
% (self.data_type, binding.data_type)
)
binding.parents.append(self)
# Do acyclic check after increasing the in-degree of child node by setting
# current interface as a parent of the child node.
if not self.check_dag_acyclic(
binding.io_owner, self.io_owner.input_bindings.bindings
):
raise RuntimeError("Illegal connection: Cause a cycle!")
class BindingList:
"""Container for bindings(input or output interface).
Parameters
----------
owner : ModuleWrapper/PipelineConfig
The owner of this class can be ModuleWrapper or PipelineConfig.
io_type : str
The type of this class can be "input" or "output".
"""
def __init__(self, owner, io_type):
self.bindings = {}
self.io_owner = owner
self.binding_type = io_type
def get_binding_data_type(self, key):
if isinstance(self.io_owner, PipelineConfig.ModuleWrapper):
return self.io_owner.get_data_type(key, self.binding_type)
return None
def __getitem__(self, key):
if key not in self.bindings:
data_type = self.get_binding_data_type(key)
if not data_type and isinstance(self.io_owner, PipelineConfig.ModuleWrapper):
raise RuntimeError(f"Can not find {key} in binding list {self.binding_type}.")
self.bindings[key] = PipelineConfig.Binding(
self.io_owner, self.binding_type, key, data_type
)
return self.bindings[key]
class ModuleWrapper:
"""This class is a wrapper representing the module and contains information such as
module information, binding information and building information.
"""
def __init__(self, mod=None):
self.target_host = None
self.build_func = None
self.params = None
self.target = None
self.fcompile = None
self.name = None
self.dev = None
self.export_cc = None
self.cpu_affinity = ""
self.idx = None
self.mod = mod
self.input_params = InferType()(mod)["main"].params
self.output_type = InferType()(mod)["main"].checked_type.ret_type
self.input_bindings = PipelineConfig.BindingList(self, "input")
self.output_bindings = PipelineConfig.BindingList(self, "output")
self.param_binding = PipelineConfig.Binding(self, "param", "param")
def __eq__(self, other):
if isinstance(other, PipelineConfig.ModuleWrapper):
return self.mod == other.mod
return False
def __getitem__(self, key):
if isinstance(key, str):
if key == "input":
return self.input_bindings
if key == "output":
return self.output_bindings
if key == "param":
return self.param_binding
raise RuntimeError(f"{key} not found!")
raise RuntimeError('The data type of "key" is not supported!')
def get_data_type(self, key, interface_type):
"""Get the module interface data type according to the key value and interface type.
Parameters
----------
key: str
The interface name.
interface_type:
The interface type.
Return
-------
Return data type.
"""
if interface_type == "input":
for param in self.input_params:
if param.name_hint == key:
return param._checked_type_
if interface_type == "output":
if isinstance(self.output_type, tvm.ir.type.TupleType):
if int(key) < len(self.output_type.fields):
return self.output_type.fields[int(key)]
elif int(key) == 0:
return self.output_type
return None
def set_idx_name(self, idx):
# Set the index value and generate the module name.
self.idx = idx
self.name = "mod{}".format(str(idx))
def is_root_mod(self):
"""Check whether this node is the root node in DAG, this function is used
in topological sort.
"""
return all([not b.parents for b in self.input_bindings.bindings.values()])
def remove_self_from_bindings(self):
"""Remove the current node from child dependencies to reduce the in-degree
of child node, this function is used in topological sort.
"""
for binding in self.output_bindings.bindings.values():
for child in binding.bindings:
if binding in child.parents:
child.parents.remove(binding)
def __init__(self):
self.mod_wrapper = {}
self.input_bindings = self.BindingList(self, "input")
self.output_bindings = self.BindingList(self, "output")
# There is a map of global parameters group and module index.
self.param_group_bindings = self.BindingList(self, "param")
def __str__(self):
# Get configuration information as a string.
# Use topological sort to get correct module order.
self.dag_topology_sort()
# Getting the parameters dependencies.
param_dump = "Params\n"
for param_name in self.param_group_bindings.bindings:
inf = self.param_group_bindings.bindings[param_name]
param_dump += str(inf) + "\n"
# Get the input dependencies.
input_dump = "\nInputs\n"
for input_name in self.input_bindings.bindings:
inf = self.input_bindings.bindings[input_name]
input_dump += str(inf) + "\n"
# Get the connections information of each module.
output = {}
connections_dump = "\nconnections\n"
for mod in self.mod_wrapper:
for interface in self.mod_wrapper[mod].output_bindings.bindings.values():
if interface.bindings:
mname, dname = interface.get_name()
iname = mname + ".output(" + dname + ")->"
for dep in interface.bindings:
dep_mname, dep_dname = dep.get_name()
if isinstance(dep.io_owner, PipelineConfig.ModuleWrapper):
iname += f" {dep_mname}.{dep_dname}"
connections_dump += f" |{iname}\n"
else:
output[dep_dname] = f"{mname}.output({dname})"
# Get the output dependencies.
output_dump = "\noutput\n"
for name in sorted(output.keys()):
output_dump += f" |output({name}) : {output[name]}\n"
return param_dump + input_dump + output_dump + connections_dump
def __getitem__(self, key):
if isinstance(key, tvm.ir.module.IRModule):
if key not in self.mod_wrapper:
self.mod_wrapper[key] = self.ModuleWrapper(key)
return self.mod_wrapper[key]
if isinstance(key, str):
if key == "input":
return self.input_bindings
if key == "output":
return self.output_bindings
if key == "param_group":
return self.param_group_bindings
raise RuntimeError(f"{key} not found!")
raise RuntimeError(f'The key type "{type(key)}" is not supported!')
def get_config(self):
"""Get the configuration information in dictionary form, this configuration
will be used to create pipeline executor.
"""
# Use topological sort to get the correct order of modules.
self.dag_topology_sort()
mconfig = {}
module_connection = {}
for mod in self.mod_wrapper:
# Generate pipeline configuration.
mconf = {}
output_conf = []
module = self.mod_wrapper[mod]
for _, binding in module.output_bindings.bindings.items():
dep_conf = []
output = {}
if binding.bindings:
for dep in binding.bindings:
dep_item = {}
_, dname = dep.get_name()
if dep.is_pipeline_executor_interface():
dep_item["global_output_index"] = int(dname)
else:
dep_item["mod_idx"] = dep.get_owner_idx()
dep_item["input_name"] = dname
dep_conf.append(dep_item)
# The value of output_idx start from 0.
output["output_idx"] = int(binding.name)
output["dependencies"] = dep_conf
output_conf.append(output)
mconf["mod_idx"] = module.idx
mconf["cpu_affinity"] = module.cpu_affinity
mconf["output"] = output_conf
module_connection[mod] = {
"pipeline": mconf,
"target_host": module.target_host,
"mod_name": "default",
"build": module.build_func,
"params": module.params,
"target": module.target,
"fcompile": module.fcompile,
"dev": module.dev,
"export_cc": module.export_cc,
}
# Creating a map including pipeline inputs and subgraph inputs.
input_connection = []
for input_name in self.input_bindings.bindings:
input_dict = self.input_bindings.bindings[input_name].get_binding_dict()
if "interface_name" not in input_dict["connection"][0]:
raise RuntimeError("interface_name is missing in connection config!")
# Creating the map including global interfaces and subgraph interfaces.
input_map = {
"global_interface_name": input_dict["interface_name"],
"mod_idx": input_dict["connection"][0]["mod_idx"],
"module_interface_name": input_dict["connection"][0]["interface_name"],
}
input_connection.append(input_map)
# Create a map including global parameters groups and modules.
param_connection = []
for param_name in self.param_group_bindings.bindings:
param_dict = self.param_group_bindings.bindings[param_name].get_binding_dict()
param_map = {
"global_param_name": param_dict["interface_name"],
"mod_idx": param_dict["connection"][0]["mod_idx"],
}
param_connection.append(param_map)
mconfig["module_connection"] = module_connection
mconfig["input_connection"] = input_connection
mconfig["param_connection"] = param_connection
return mconfig
def dag_topology_sort(self):
"""Use topological sort to get order of pipeline modules."""
mlist = []
mod_wrapper = self.mod_wrapper.copy()
while mod_wrapper:
temp_list = []
for mod, wrapper in mod_wrapper.items():
if wrapper.is_root_mod():
temp_list.append(mod)
wrapper.remove_self_from_bindings()
for mod in temp_list:
mod_wrapper.pop(mod, None)
mlist += temp_list
mod_wrapper_sort = {}
for mod, i in zip(mlist, range(len(mlist))):
self.mod_wrapper[mod].set_idx_name(i)
mod_wrapper_sort[mod] = self.mod_wrapper[mod]
self.mod_wrapper = mod_wrapper_sort
def get_mod_idx(self, mod):
# Return the module index.
idx = self.mod_wrapper[mod].idx
return idx
def pipe_input(self, name):
# Return the input interface according to the name.
return self.input_bindings[name]
def pipe_output(self, idx):
# Return the output interface according to the name.
return self.output_bindings[idx]