blob: 8ce524bd63fb8850a89923eb54cf24fd06960b76 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.sql;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.sql.planner.RelNodeVisitor;
import org.apache.apex.malhar.sql.schema.ApexSQLTable;
import org.apache.apex.malhar.sql.table.Endpoint;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.ScalarFunction;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.ScalarFunctionImpl;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.util.Util;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.datatorrent.api.DAG;
/**
* SQL Execution Environment for SQL integration API of Apex.
* This exposes calcite functionality in simple way. Most of the APIs are with builder pattern which makes it
* easier to construct a DAG using this object.
*
* Eg.
* <pre>
* SQLExecEnvironment.getEnvironment(dag)
* .registerTable("TABLE1", Object of type {@link Endpoint})
* .registerTable("TABLE2", Object of type {@link Endpoint})
* .executeSQL("INSERT INTO TABLE2 SELECT STREAM * FROM TABLE1);
* </pre>
*
* Above code will evaluate SQL statement and convert the resultant Relational Algebra to a sub-DAG.
*
* @since 3.6.0
*/
@InterfaceStability.Evolving
public class SQLExecEnvironment
{
private static final Logger logger = LoggerFactory.getLogger(SQLExecEnvironment.class);
private final JavaTypeFactoryImpl typeFactory;
private SchemaPlus schema = Frameworks.createRootSchema(true);
/**
* Construct SQL Execution Environment which works on given DAG objec.
*/
private SQLExecEnvironment()
{
this.typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
}
/**
* Given SQLExec{@link SQLExecEnvironment} object for given {@link DAG}.
*
* @return Returns {@link SQLExecEnvironment} object
*/
public static SQLExecEnvironment getEnvironment()
{
return new SQLExecEnvironment();
}
/**
* Use given model file to initialize {@link SQLExecEnvironment}.
* The model file contains definitions of endpoints and data formats.
* Example of file format is like following:
* <pre>
* {
* version: '1.0',
* defaultSchema: 'APEX',
* schemas: [{
* name: 'APEX',
* tables: [
* {
* name: 'ORDERS',
* type: 'custom',
* factory: 'org.apache.apex.malhar.sql.schema.ApexSQLTableFactory',
* stream: {
* stream: true
* },
* operand: {
* endpoint: 'file',
* messageFormat: 'csv',
* endpointOperands: {
* directory: '/tmp/input'
* },
* messageFormatOperands: {
* schema: '{"separator":",","quoteChar":"\\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}'
* }
* }
* }
* ]
* }]
* }
* </pre>
*
* @param model String content of model file.
* @return Returns this {@link SQLExecEnvironment}
*/
public SQLExecEnvironment withModel(String model)
{
if (model == null) {
return this;
}
Properties p = new Properties();
p.put("model", "inline:" + model);
try (Connection connection = DriverManager.getConnection("jdbc:calcite:", p)) {
CalciteConnection conn = connection.unwrap(CalciteConnection.class);
this.schema = conn.getRootSchema().getSubSchema(connection.getSchema());
} catch (SQLException e) {
throw new RuntimeException(e);
}
return this;
}
/**
* Register a table using {@link Endpoint} with this {@link SQLExecEnvironment}
*
* @param name Name of the table that needs to be present with SQL Statement.
* @param endpoint Object of type {@link Endpoint}
*
* @return Returns this {@link SQLExecEnvironment}
*/
public SQLExecEnvironment registerTable(String name, Endpoint endpoint)
{
Preconditions.checkNotNull(name, "Table name cannot be null");
registerTable(name, new ApexSQLTable(schema, name, endpoint));
return this;
}
/**
* Register a table using {@link Table} with this {@link SQLExecEnvironment}
*
* @param name Name of the table that needs to be present with SQL statement
* @param table Object of type {@link Table}
*
* @return Returns this {@link SQLExecEnvironment}
*/
public SQLExecEnvironment registerTable(String name, Table table)
{
Preconditions.checkNotNull(name, "Table name cannot be null");
schema.add(name, table);
return this;
}
/**
* Register custom function with this {@link SQLExecEnvironment}
*
* @param name Name of the scalar SQL function that needs made available to SQL Statement
* @param fn Object of type {@link Function}
*
* @return Returns this {@link SQLExecEnvironment}
*/
public SQLExecEnvironment registerFunction(String name, Function fn)
{
Preconditions.checkNotNull(name, "Function name cannot be null");
schema.add(name, fn);
return this;
}
/**
* Register custom function from given static method with this {@link SQLExecEnvironment}
*
* @param name Name of the scalar SQL function that needs make available to SQL Statement
* @param clazz {@link Class} which contains given static method
* @param methodName Name of the method from given clazz
*
* @return Return this {@link SQLExecEnvironment}
*/
public SQLExecEnvironment registerFunction(String name, Class clazz, String methodName)
{
Preconditions.checkNotNull(name, "Function name cannot be null");
ScalarFunction scalarFunction = ScalarFunctionImpl.create(clazz, methodName);
return registerFunction(name, scalarFunction);
}
/**
* This is the main method takes SQL statement as input and contructs a DAG using contructs registered with this
* {@link SQLExecEnvironment}.
*
* @param sql SQL statement that should be converted to a DAG.
*/
public void executeSQL(DAG dag, String sql)
{
FrameworkConfig config = buildFrameWorkConfig();
Planner planner = Frameworks.getPlanner(config);
try {
logger.info("Parsing SQL statement: {}", sql);
SqlNode parsedTree = planner.parse(sql);
SqlNode validatedTree = planner.validate(parsedTree);
RelNode relationalTree = planner.rel(validatedTree).rel;
logger.info("RelNode relationalTree generate from SQL statement is:\n {}",
Util.toLinux(RelOptUtil.toString(relationalTree)));
RelNodeVisitor visitor = new RelNodeVisitor(dag, typeFactory);
visitor.traverse(relationalTree);
} catch (Exception e) {
throw Throwables.propagate(e);
} finally {
planner.close();
}
}
/**
* Method method build a calcite framework configuration for calcite to parse SQL and generate relational tree
* out of it.
* @return FrameworkConfig
*/
private FrameworkConfig buildFrameWorkConfig()
{
List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
sqlOperatorTables.add(SqlStdOperatorTable.instance());
sqlOperatorTables
.add(new CalciteCatalogReader(CalciteSchema.from(schema), false, Collections.<String>emptyList(), typeFactory));
return Frameworks.newConfigBuilder().defaultSchema(schema)
.parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build())
.operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)).build();
}
}