blob: 40654dc233ab1b61f1729f12c216c5cba537b064 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.client.gateway.local;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.sql.parser.ddl.SqlCreateFunction;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.sql.parser.ddl.SqlCreateView;
import org.apache.flink.sql.parser.ddl.SqlNodeInfo;
import org.apache.flink.table.api.QueryConfig;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.functions.AggregateFunction;
import org.apache.flink.table.api.functions.FunctionService;
import org.apache.flink.table.api.functions.ScalarFunction;
import org.apache.flink.table.api.functions.TableFunction;
import org.apache.flink.table.api.functions.UserDefinedFunction;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.config.entries.FunctionEntry;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.local.result.BasicResult;
import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
import org.apache.flink.table.client.gateway.local.result.DynamicResult;
import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
import org.apache.flink.table.client.utils.SqlJobUtil;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FunctionDescriptor;
import org.apache.flink.table.util.TableSchemaUtil;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Executor that performs the Flink communication locally. The calls are blocking depending on the
* response time to the Flink cluster. Flink jobs are not blocking.
*/
public class LocalExecutor implements Executor {
private static final Logger LOG = LoggerFactory.getLogger(LocalExecutor.class);
private static final String DEFAULT_ENV_FILE = "sql-client-defaults.yaml";
private static final AtomicInteger TMP_VIEW_SEQUENCE_ID = new AtomicInteger(0);
// deployment
private final Environment defaultEnvironment;
private final List<URL> dependencies;
private final Configuration flinkConfig;
private final List<CustomCommandLine<?>> commandLines;
private final Options commandLineOptions;
// result maintenance
private final ResultStore resultStore;
/**
* Cached execution context for unmodified sessions. Do not access this variable directly
* but through {@link LocalExecutor#getOrCreateExecutionContext}.
*/
private ExecutionContext<?> executionContext;
/**
* Creates a local executor for submitting table programs and retrieving results.
*/
public LocalExecutor(URL defaultEnv, List<URL> jars, List<URL> libraries) {
// discover configuration
final String flinkConfigDir;
try {
// find the configuration directory
flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
// load the global configuration
this.flinkConfig = GlobalConfiguration.loadConfiguration(flinkConfigDir);
// initialize default file system
try {
FileSystem.initialize(this.flinkConfig);
} catch (IOException e) {
throw new SqlClientException(
"Error while setting the default filesystem scheme from configuration.", e);
}
// load command lines for deployment
this.commandLines = CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
this.commandLineOptions = collectCommandLineOptions(commandLines);
} catch (Exception e) {
throw new SqlClientException("Could not load Flink configuration.", e);
}
// try to find a default environment
if (defaultEnv == null) {
final String defaultFilePath = flinkConfigDir + "/" + DEFAULT_ENV_FILE;
System.out.println("No default environment specified.");
System.out.print("Searching for '" + defaultFilePath + "'...");
final File file = new File(defaultFilePath);
if (file.exists()) {
System.out.println("found.");
try {
defaultEnv = Path.fromLocalFile(file).toUri().toURL();
} catch (MalformedURLException e) {
throw new SqlClientException(e);
}
LOG.info("Using default environment file: {}", defaultEnv);
} else {
System.out.println("not found.");
}
}
// inform user
if (defaultEnv != null) {
System.out.println("Reading default environment from: " + defaultEnv);
try {
defaultEnvironment = Environment.parse(defaultEnv);
} catch (IOException e) {
throw new SqlClientException("Could not read default environment file at: " + defaultEnv, e);
}
} else {
defaultEnvironment = new Environment();
}
// discover dependencies
dependencies = discoverDependencies(jars, libraries);
// prepare result store
resultStore = new ResultStore(flinkConfig);
}
/**
* Constructor for testing purposes.
*/
public LocalExecutor(Environment defaultEnvironment, List<URL> dependencies, Configuration flinkConfig, CustomCommandLine<?> commandLine) {
this.defaultEnvironment = defaultEnvironment;
this.dependencies = dependencies;
this.flinkConfig = flinkConfig;
this.commandLines = Collections.singletonList(commandLine);
this.commandLineOptions = collectCommandLineOptions(commandLines);
// prepare result store
resultStore = new ResultStore(flinkConfig);
}
@Override
public void start() {
// nothing to do yet
}
@Override
public Map<String, String> getSessionProperties(SessionContext session) throws SqlExecutionException {
final Environment env = getOrCreateExecutionContext(session)
.getMergedEnvironment();
final Map<String, String> properties = new HashMap<>();
properties.putAll(env.getExecution().asTopLevelMap());
properties.putAll(env.getDeployment().asTopLevelMap());
return properties;
}
@Override
public List<String> listTables(SessionContext session) throws SqlExecutionException {
final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
.createEnvironmentInstance()
.getTableEnvironment();
return Arrays.asList(tableEnv.listTables());
}
@Override
public List<String> listUserDefinedFunctions(SessionContext session) throws SqlExecutionException {
final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
.createEnvironmentInstance()
.getTableEnvironment();
return Arrays.asList(tableEnv.listUserDefinedFunctions());
}
@Override
public void setDefaultDatabase(SessionContext session, String namePath) throws SqlExecutionException {
final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
.createEnvironmentInstance()
.getTableEnvironment();
try {
tableEnv.setDefaultDatabase(namePath.split("\\."));
} catch (Throwable t) {
// catch everything such that the query does not crash the executor
throw new SqlExecutionException("No catalog and database with this name could be found.", t);
}
}
@Override
public List<String> listCatalogs(SessionContext session) throws SqlExecutionException {
final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
.createEnvironmentInstance()
.getTableEnvironment();
return Arrays.asList(tableEnv.listCatalogs());
}
@Override
public List<String> listDatabases(SessionContext session) throws SqlExecutionException {
final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
.createEnvironmentInstance()
.getTableEnvironment();
return Arrays.asList(tableEnv.listDatabases());
}
@Override
public TableSchema getTableSchema(SessionContext session, String name) throws SqlExecutionException {
final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
.createEnvironmentInstance()
.getTableEnvironment();
try {
return tableEnv.scan(name.split("\\.")).getSchema();
} catch (Throwable t) {
// catch everything such that the query does not crash the executor
throw new SqlExecutionException("No table with this name could be found.", t);
}
}
@Override
public String explainStatement(SessionContext session, String statement) throws SqlExecutionException {
final ExecutionContext<?> context = getOrCreateExecutionContext(session);
final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();
// translate
try {
final Table table = createTable(tableEnv, statement);
// explanation requires an optimization step that might reference UDFs during code compilation
return context.wrapClassLoader(() -> tableEnv.explain(table));
} catch (Throwable t) {
// catch everything such that the query does not crash the executor
throw new SqlExecutionException("Invalid SQL statement.", t);
}
}
@Override
public List<String> completeStatement(SessionContext session, String statement, int position) {
final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
.createEnvironmentInstance()
.getTableEnvironment();
try {
// TODO: FLINK-8865
//return Arrays.asList(tableEnv.getCompletionHints(statement, position));
return Collections.emptyList();
} catch (Throwable t) {
// catch everything such that the query does not crash the executor
if (LOG.isDebugEnabled()) {
LOG.debug("Could not complete statement at " + position + ":" + statement, t);
}
return Collections.emptyList();
}
}
@Override
public ResultDescriptor executeQuery(SessionContext session, String query) throws SqlExecutionException {
final ExecutionContext<?> context = getOrCreateExecutionContext(session);
return executeQueryInternal(context, query);
}
@Override
public TypedResult<List<Tuple2<Boolean, Row>>> retrieveResultChanges(SessionContext session,
String resultId) throws SqlExecutionException {
final DynamicResult<?> result = resultStore.getResult(resultId);
if (result == null) {
throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
}
if (result.isMaterialized()) {
throw new SqlExecutionException("Invalid result retrieval mode.");
}
return ((ChangelogResult<?>) result).retrieveChanges();
}
@Override
public TypedResult<Integer> snapshotResult(SessionContext session, String resultId, int pageSize) throws SqlExecutionException {
final DynamicResult<?> result = resultStore.getResult(resultId);
if (result == null) {
throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
}
if (!result.isMaterialized()) {
throw new SqlExecutionException("Invalid result retrieval mode.");
}
return ((MaterializedResult<?>) result).snapshot(pageSize);
}
@Override
public List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException {
final DynamicResult<?> result = resultStore.getResult(resultId);
if (result == null) {
throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
}
if (!result.isMaterialized()) {
throw new SqlExecutionException("Invalid result retrieval mode.");
}
return ((MaterializedResult<?>) result).retrievePage(page);
}
@Override
public void cancelQuery(SessionContext session, String resultId) throws SqlExecutionException {
final ExecutionContext<?> context = getOrCreateExecutionContext(session);
cancelQueryInternal(context, resultId);
}
@Override
public ProgramTargetDescriptor executeUpdate(SessionContext session, String statement) throws SqlExecutionException {
final ExecutionContext<?> context = getOrCreateExecutionContext(session);
return executeUpdateInternal(context, statement);
}
@Override
public void validateSession(SessionContext session) throws SqlExecutionException {
// throws exceptions if an environment cannot be created with the given session context
getOrCreateExecutionContext(session).createEnvironmentInstance();
}
@Override
public void createTable(SessionContext session, String ddl) throws SqlExecutionException {
final ExecutionContext<?> context = getOrCreateExecutionContext(session);
final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();
TableEnvironment tEnv = envInst.getTableEnvironment();
try {
List<SqlNodeInfo> sqlNodeList = SqlJobUtil.parseSqlContext(ddl);
sqlNodeList
.stream()
.filter((node) -> node.getSqlNode() instanceof SqlCreateTable)
.forEach((node) -> SqlJobUtil.registerExternalTable(tEnv, node));
} catch (Exception e) {
throw new SqlExecutionException("Could not create a table from ddl: " + ddl, e);
}
}
@Override
public void createView(SessionContext session, String ddl) throws SqlExecutionException {
final ExecutionContext<?> context = getOrCreateExecutionContext(session);
final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();
TableEnvironment tEnv = envInst.getTableEnvironment();
try {
List<SqlNodeInfo> sqlNodeList = SqlJobUtil.parseSqlContext(ddl);
sqlNodeList
.stream()
.filter((node) -> node.getSqlNode() instanceof SqlCreateView)
.forEach((node) -> {
String subQuery = ((SqlCreateView) (node.getSqlNode())).getSubQuerySql();
Table view = tEnv.sqlQuery(subQuery);
List<String> aliasNames = ((SqlCreateView) node.getSqlNode()).getFieldNames();
if (!aliasNames.isEmpty()) {
String tmpView =
"_tmp_" + ((SqlCreateView) node.getSqlNode()).getName() +
"_" + TMP_VIEW_SEQUENCE_ID.incrementAndGet();
tEnv.registerTable(tmpView, view);
String viewSql = "SELECT * FROM" + tmpView;
StringBuilder aliasClause = new StringBuilder();
List<String> inputFields = view.getRelNode().getRowType().getFieldNames();
assert aliasNames.size() == inputFields.size();
if (aliasNames.size() != inputFields.size()) {
throw new RuntimeException("View definition and input fields not match: \nDef Fields: "
+ aliasNames.toString() + "\nInput Fields: " + inputFields.toString());
}
for (int idx = 0; idx < aliasNames.size(); ++idx) {
aliasClause.append("`" + inputFields.get(idx) + "` as `" + aliasNames.get(idx) + "`");
if (idx < aliasNames.size() - 1) {
aliasClause.append(", ");
}
}
view = tEnv.sqlQuery(String.format(viewSql, aliasClause.toString()));
}
tEnv.registerTable(((SqlCreateView) node.getSqlNode()).getName(), view);
});
} catch (Exception e) {
throw new SqlExecutionException("Could not create a view from ddl: " + ddl, e);
}
}
@Override
public void createFunction(SessionContext session, String ddl) throws SqlExecutionException {
final ExecutionContext<?> context = getOrCreateExecutionContext(session);
final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();
TableEnvironment tEnv = envInst.getTableEnvironment();
try {
List<SqlNodeInfo> sqlNodeList = SqlJobUtil.parseSqlContext(ddl);
sqlNodeList
.stream()
.filter((node) -> node.getSqlNode() instanceof SqlCreateFunction)
.forEach((node) -> {
SqlCreateFunction sqlCreateFunction = (SqlCreateFunction) node.getSqlNode();
String funcName = sqlCreateFunction.getFunctionName().toString();
String funcDef = sqlCreateFunction.getClassName();
UserDefinedFunction func = createUserDefinedFunction(context.getClassLoader(), funcName, funcDef);
if (func instanceof TableFunction) {
TableFunction<?> tableFunction = (TableFunction) func;
tEnv.registerFunction(funcName, tableFunction);
} else if (func instanceof AggregateFunction) {
AggregateFunction<?, ?> aggregateFunction = (AggregateFunction) func;
tEnv.registerFunction(funcName, aggregateFunction);
} else if (func instanceof ScalarFunction) {
ScalarFunction scalarFunction = (ScalarFunction) func;
tEnv.registerFunction(funcName, scalarFunction);
} else {
// TODO: Support Hive UDX
throw new RuntimeException("Couldn't match the type of UDF class: " + funcDef);
}
});
} catch (Exception e) {
throw new SqlExecutionException("Could not create a udx from ddl: " + ddl, e);
}
}
@Override
public void stop(SessionContext session) {
resultStore.getResults().forEach((resultId) -> {
try {
cancelQuery(session, resultId);
} catch (Throwable t) {
// ignore any throwable to keep the clean up running
}
});
}
// --------------------------------------------------------------------------------------------
private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId) {
final DynamicResult<T> result = resultStore.getResult(resultId);
if (result == null) {
throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
}
// stop retrieval and remove the result
LOG.info("Cancelling job {} and result retrieval.", resultId);
result.close();
resultStore.removeResult(resultId);
// stop Flink job
try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
ClusterClient<T> clusterClient = null;
try {
// retrieve existing cluster
clusterClient = clusterDescriptor.retrieve(context.getClusterId());
try {
clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId)));
} catch (Throwable t) {
// the job might has finished earlier
}
} catch (Exception e) {
throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
} finally {
try {
if (clusterClient != null) {
clusterClient.shutdown();
}
} catch (Exception e) {
// ignore
}
}
} catch (SqlExecutionException e) {
throw e;
} catch (Exception e) {
throw new SqlExecutionException("Could not locate a cluster.", e);
}
}
private <C> ProgramTargetDescriptor executeUpdateInternal(ExecutionContext<C> context, String statement) {
final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();
applyUpdate(context, envInst.getTableEnvironment(), envInst.getQueryConfig(), statement);
// create job graph with dependencies
final String jobName = context.getSessionContext().getName() + ": " + statement;
final JobGraph jobGraph;
try {
// createJobGraph requires an optimization step that might reference UDFs during code compilation
jobGraph = context.wrapClassLoader(() -> {
return envInst.createJobGraph(jobName);
});
} catch (Throwable t) {
// catch everything such that the statement does not crash the executor
throw new SqlExecutionException("Invalid SQL statement.", t);
}
// create execution
final BasicResult<C> result = new BasicResult<>();
final ProgramDeployer<C> deployer = new ProgramDeployer<>(
context, jobName, jobGraph, result, false);
// blocking deployment
deployer.run();
return ProgramTargetDescriptor.of(
result.getClusterId(),
jobGraph.getJobID(),
result.getWebInterfaceUrl());
}
private <C> ResultDescriptor executeQueryInternal(ExecutionContext<C> context, String query) {
final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();
// create table
final Table table = createTable(envInst.getTableEnvironment(), query);
// initialize result
final DynamicResult<C> result = resultStore.createResult(
context.getMergedEnvironment(),
removeTimeAttributes(table.getSchema()),
envInst.getExecutionConfig());
// create job graph with dependencies
final String jobName = context.getSessionContext().getName() + ": " + query;
final JobGraph jobGraph;
try {
// createJobGraph requires an optimization step that might reference UDFs during code compilation
jobGraph = context.wrapClassLoader(() -> {
table.writeToSink(result.getTableSink(), envInst.getQueryConfig());
return envInst.createJobGraph(jobName);
});
} catch (Throwable t) {
// the result needs to be closed as long as
// it not stored in the result store
result.close();
// catch everything such that the query does not crash the executor
throw new SqlExecutionException("Invalid SQL query.", t);
}
// store the result with a unique id (the job id for now)
final String resultId = jobGraph.getJobID().toString();
resultStore.storeResult(resultId, result);
// create execution
final ProgramDeployer<C> deployer = new ProgramDeployer<>(
context, jobName, jobGraph, result, true);
// start result retrieval
result.startRetrieval(deployer);
return new ResultDescriptor(
resultId,
removeTimeAttributes(table.getSchema()),
result.isMaterialized());
}
/**
* Creates a table using the given query in the given table environment.
*/
private Table createTable(TableEnvironment tableEnv, String selectQuery) {
// parse and validate query
try {
return tableEnv.sqlQuery(selectQuery);
} catch (Throwable t) {
// catch everything such that the query does not crash the executor
throw new SqlExecutionException("Invalid SQL statement.", t);
}
}
/**
* Applies the given update statement to the given table environment with query configuration.
*/
private <C> void applyUpdate(ExecutionContext<C> context, TableEnvironment tableEnv, QueryConfig queryConfig, String updateStatement) {
// parse and validate statement
try {
// update statement requires an optimization step that might reference UDFs during code compilation
context.wrapClassLoader(() -> {
tableEnv.sqlUpdate(updateStatement, queryConfig);
return null;
});
} catch (Throwable t) {
// catch everything such that the statement does not crash the executor
throw new SqlExecutionException("Invalid SQL update statement.", t);
}
}
/**
* Creates or reuses the execution context.
*/
private synchronized ExecutionContext<?> getOrCreateExecutionContext(SessionContext session) throws SqlExecutionException {
if (executionContext == null || !executionContext.getSessionContext().equals(session)) {
try {
executionContext = new ExecutionContext<>(defaultEnvironment, session, dependencies,
flinkConfig, commandLineOptions, commandLines);
} catch (Throwable t) {
// catch everything such that a configuration does not crash the executor
throw new SqlExecutionException("Could not create execution context.", t);
}
}
return executionContext;
}
// --------------------------------------------------------------------------------------------
private static List<URL> discoverDependencies(List<URL> jars, List<URL> libraries) {
final List<URL> dependencies = new ArrayList<>();
try {
// find jar files
for (URL url : jars) {
JobWithJars.checkJarFile(url);
dependencies.add(url);
}
// find jar files in library directories
for (URL libUrl : libraries) {
final File dir = new File(libUrl.toURI());
if (!dir.isDirectory()) {
throw new SqlClientException("Directory expected: " + dir);
} else if (!dir.canRead()) {
throw new SqlClientException("Directory cannot be read: " + dir);
}
final File[] files = dir.listFiles();
if (files == null) {
throw new SqlClientException("Directory cannot be read: " + dir);
}
for (File f : files) {
// only consider jars
if (f.isFile() && f.getAbsolutePath().toLowerCase().endsWith(".jar")) {
final URL url = f.toURI().toURL();
JobWithJars.checkJarFile(url);
dependencies.add(url);
}
}
}
} catch (Exception e) {
throw new SqlClientException("Could not load all required JAR files.", e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Using the following dependencies: {}", dependencies);
}
return dependencies;
}
private static Options collectCommandLineOptions(List<CustomCommandLine<?>> commandLines) {
final Options customOptions = new Options();
for (CustomCommandLine<?> customCommandLine : commandLines) {
customCommandLine.addRunOptions(customOptions);
}
return CliFrontendParser.mergeOptions(
CliFrontendParser.getRunCommandOptions(),
customOptions);
}
private static TableSchema removeTimeAttributes(TableSchema schema) {
return TableSchemaUtil.withoutTimeAttributes(schema);
}
/**
* Create user defined function.
*/
private static UserDefinedFunction createUserDefinedFunction(ClassLoader classLoader, String funcName, String funcDef) {
DescriptorProperties properties = new DescriptorProperties();
properties.putString("name", funcName);
properties.putString("from", "class");
properties.putString("class", funcDef);
final FunctionDescriptor desc = FunctionEntry.create(properties).getDescriptor();
return FunctionService.createFunction(desc, classLoader, false);
}
}