blob: 176939c915734b549b43f24fc0a8b47bef0cffe5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.jdbc;
import org.apache.calcite.avatica.AvaticaUtils;
import org.apache.calcite.avatica.MetaImpl;
import org.apache.calcite.avatica.SqlType;
import org.apache.calcite.linq4j.function.Experimental;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.Wrapper;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlDialectFactory;
import org.apache.calcite.sql.SqlDialectFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import javax.sql.DataSource;
import static java.util.Objects.requireNonNull;
/**
* Implementation of {@link Schema} that is backed by a JDBC data source.
*
* <p>The tables in the JDBC data source appear to be tables in this schema;
* queries against this schema are executed against those tables, pushing down
* as much as possible of the query logic to SQL.
*/
public class JdbcSchema implements Schema, Wrapper {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSchema.class);
final DataSource dataSource;
final @Nullable String catalog;
final @Nullable String schema;
public final SqlDialect dialect;
final JdbcConvention convention;
private @Nullable ImmutableMap<String, JdbcTable> tableMap;
private final boolean snapshot;
@Experimental
public static final ThreadLocal<@Nullable Foo> THREAD_METADATA = new ThreadLocal<>();
private static final Ordering<Iterable<Integer>> VERSION_ORDERING =
Ordering.<Integer>natural().lexicographical();
/**
* Creates a JDBC schema.
*
* @param dataSource Data source
* @param dialect SQL dialect
* @param convention Calling convention
* @param catalog Catalog name, or null
* @param schema Schema name pattern
*/
public JdbcSchema(DataSource dataSource, SqlDialect dialect,
JdbcConvention convention, @Nullable String catalog, @Nullable String schema) {
this(dataSource, dialect, convention, catalog, schema, null);
}
private JdbcSchema(DataSource dataSource, SqlDialect dialect,
JdbcConvention convention, @Nullable String catalog, @Nullable String schema,
@Nullable ImmutableMap<String, JdbcTable> tableMap) {
this.dataSource = requireNonNull(dataSource, "dataSource");
this.dialect = requireNonNull(dialect, "dialect");
this.convention = convention;
this.catalog = catalog;
this.schema = schema;
this.tableMap = tableMap;
this.snapshot = tableMap != null;
}
public static JdbcSchema create(
SchemaPlus parentSchema,
String name,
DataSource dataSource,
@Nullable String catalog,
@Nullable String schema) {
return create(parentSchema, name, dataSource,
SqlDialectFactoryImpl.INSTANCE, catalog, schema);
}
public static JdbcSchema create(
SchemaPlus parentSchema,
String name,
DataSource dataSource,
SqlDialectFactory dialectFactory,
@Nullable String catalog,
@Nullable String schema) {
final Expression expression =
Schemas.subSchemaExpression(parentSchema, name, JdbcSchema.class);
final SqlDialect dialect = createDialect(dialectFactory, dataSource);
final JdbcConvention convention =
JdbcConvention.of(dialect, expression, name);
return new JdbcSchema(dataSource, dialect, convention, catalog, schema);
}
/**
* Creates a JdbcSchema, taking credentials from a map.
*
* @param parentSchema Parent schema
* @param name Name
* @param operand Map of property/value pairs
* @return A JdbcSchema
*/
public static JdbcSchema create(
SchemaPlus parentSchema,
String name,
Map<String, Object> operand) {
DataSource dataSource;
try {
final String dataSourceName = (String) operand.get("dataSource");
if (dataSourceName != null) {
dataSource =
AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);
} else {
final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl");
final String jdbcDriver = (String) operand.get("jdbcDriver");
final String jdbcUser = (String) operand.get("jdbcUser");
final String jdbcPassword = (String) operand.get("jdbcPassword");
dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
}
} catch (Exception e) {
throw new RuntimeException("Error while reading dataSource", e);
}
String jdbcCatalog = (String) operand.get("jdbcCatalog");
String jdbcSchema = (String) operand.get("jdbcSchema");
String sqlDialectFactory = (String) operand.get("sqlDialectFactory");
if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {
return JdbcSchema.create(
parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
} else {
SqlDialectFactory factory =
AvaticaUtils.instantiatePlugin(SqlDialectFactory.class,
sqlDialectFactory);
return JdbcSchema.create(parentSchema, name, dataSource, factory,
jdbcCatalog, jdbcSchema);
}
}
/**
* Returns a suitable SQL dialect for the given data source.
*
* @param dataSource The data source
*
* @deprecated Use {@link #createDialect(SqlDialectFactory, DataSource)} instead
*/
@Deprecated // to be removed before 2.0
public static SqlDialect createDialect(DataSource dataSource) {
return createDialect(SqlDialectFactoryImpl.INSTANCE, dataSource);
}
/** Returns a suitable SQL dialect for the given data source. */
public static SqlDialect createDialect(SqlDialectFactory dialectFactory,
DataSource dataSource) {
return JdbcUtils.DialectPool.INSTANCE.get(dialectFactory, dataSource);
}
/** Creates a JDBC data source with the given specification. */
public static DataSource dataSource(String url, @Nullable String driverClassName,
@Nullable String username, @Nullable String password) {
if (url.startsWith("jdbc:hsqldb:")) {
// Prevent hsqldb from screwing up java.util.logging.
System.setProperty("hsqldb.reconfig_logging", "false");
}
return JdbcUtils.DataSourcePool.INSTANCE.get(url, driverClassName, username,
password);
}
@Override public boolean isMutable() {
return false;
}
@Override public Schema snapshot(SchemaVersion version) {
return new JdbcSchema(dataSource, dialect, convention, catalog, schema,
tableMap);
}
// Used by generated code.
public DataSource getDataSource() {
return dataSource;
}
@Override public Expression getExpression(@Nullable SchemaPlus parentSchema, String name) {
requireNonNull(parentSchema, "parentSchema must not be null for JdbcSchema");
return Schemas.subSchemaExpression(parentSchema, name, JdbcSchema.class);
}
protected Multimap<String, Function> getFunctions() {
// TODO: populate map from JDBC metadata
return ImmutableMultimap.of();
}
@Override public final Collection<Function> getFunctions(String name) {
return getFunctions().get(name); // never null
}
@Override public final Set<String> getFunctionNames() {
return getFunctions().keySet();
}
private ImmutableMap<String, JdbcTable> computeTables() {
Connection connection = null;
ResultSet resultSet = null;
try {
connection = dataSource.getConnection();
final Pair<@Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection);
final String catalog = catalogSchema.left;
final String schema = catalogSchema.right;
final Iterable<MetaImpl.MetaTable> tableDefs;
Foo threadMetadata = THREAD_METADATA.get();
if (threadMetadata != null) {
tableDefs = threadMetadata.apply(catalog, schema);
} else {
final List<MetaImpl.MetaTable> tableDefList = new ArrayList<>();
final DatabaseMetaData metaData = connection.getMetaData();
resultSet = metaData.getTables(catalog, schema, null, null);
while (resultSet.next()) {
final String catalogName = resultSet.getString(1);
final String schemaName = resultSet.getString(2);
final String tableName = resultSet.getString(3);
final String tableTypeName = resultSet.getString(4);
tableDefList.add(
new MetaImpl.MetaTable(catalogName, schemaName, tableName,
tableTypeName));
}
tableDefs = tableDefList;
}
final ImmutableMap.Builder<String, JdbcTable> builder =
ImmutableMap.builder();
for (MetaImpl.MetaTable tableDef : tableDefs) {
// Clean up table type. In particular, this ensures that 'SYSTEM TABLE',
// returned by Phoenix among others, maps to TableType.SYSTEM_TABLE.
// We know enum constants are upper-case without spaces, so we can't
// make things worse.
//
// PostgreSQL returns tableTypeName==null for pg_toast* tables
// This can happen if you start JdbcSchema off a "public" PG schema
// The tables are not designed to be queried by users, however we do
// not filter them as we keep all the other table types.
final String tableTypeName2 =
tableDef.tableType == null
? null
: tableDef.tableType.toUpperCase(Locale.ROOT).replace(' ', '_');
final TableType tableType =
Util.enumVal(TableType.OTHER, tableTypeName2);
if (tableType == TableType.OTHER && tableTypeName2 != null) {
LOGGER.info("Unknown table type: {}", tableTypeName2);
}
final JdbcTable table =
new JdbcTable(this, tableDef.tableCat, tableDef.tableSchem,
tableDef.tableName, tableType);
builder.put(tableDef.tableName, table);
}
return builder.build();
} catch (SQLException e) {
throw new RuntimeException(
"Exception while reading tables", e);
} finally {
close(connection, null, resultSet);
}
}
/** Returns [major, minor] version from a database metadata. */
private static List<Integer> version(DatabaseMetaData metaData) throws SQLException {
return ImmutableList.of(metaData.getJDBCMajorVersion(),
metaData.getJDBCMinorVersion());
}
/** Returns a pair of (catalog, schema) for the current connection. */
private Pair<@Nullable String, @Nullable String> getCatalogSchema(Connection connection)
throws SQLException {
final DatabaseMetaData metaData = connection.getMetaData();
final List<Integer> version41 = ImmutableList.of(4, 1); // JDBC 4.1
String catalog = this.catalog;
String schema = this.schema;
final boolean jdbc41OrAbove =
VERSION_ORDERING.compare(version(metaData), version41) >= 0;
if (catalog == null && jdbc41OrAbove) {
// From JDBC 4.1, catalog and schema can be retrieved from the connection
// object, hence try to get it from there if it was not specified by user
catalog = connection.getCatalog();
}
if (schema == null && jdbc41OrAbove) {
schema = connection.getSchema();
if ("".equals(schema)) {
schema = null; // PostgreSQL returns useless "" sometimes
}
}
if ((catalog == null || schema == null)
&& metaData.getDatabaseProductName().equals("PostgreSQL")) {
final String sql = "select current_database(), current_schema()";
try (Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
if (resultSet.next()) {
catalog = resultSet.getString(1);
schema = resultSet.getString(2);
}
}
}
return Pair.of(catalog, schema);
}
@Override public @Nullable Table getTable(String name) {
return getTableMap(false).get(name);
}
private synchronized ImmutableMap<String, JdbcTable> getTableMap(
boolean force) {
if (force || tableMap == null) {
tableMap = computeTables();
}
return tableMap;
}
RelProtoDataType getRelDataType(String catalogName, String schemaName,
String tableName) throws SQLException {
Connection connection = null;
try {
connection = dataSource.getConnection();
DatabaseMetaData metaData = connection.getMetaData();
return getRelDataType(metaData, catalogName, schemaName, tableName);
} finally {
close(connection, null, null);
}
}
RelProtoDataType getRelDataType(DatabaseMetaData metaData, String catalogName,
String schemaName, String tableName) throws SQLException {
final ResultSet resultSet =
metaData.getColumns(catalogName, schemaName, tableName, null);
// Temporary type factory, just for the duration of this method. Allowable
// because we're creating a proto-type, not a type; before being used, the
// proto-type will be copied into a real type factory.
final RelDataTypeFactory typeFactory =
new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder();
while (resultSet.next()) {
final String columnName = requireNonNull(resultSet.getString(4), "columnName");
final int dataType = resultSet.getInt(5);
final String typeString = resultSet.getString(6);
final int precision;
final int scale;
switch (SqlType.valueOf(dataType)) {
case TIMESTAMP:
case TIME:
precision = resultSet.getInt(9); // SCALE
scale = 0;
break;
default:
precision = resultSet.getInt(7); // SIZE
scale = resultSet.getInt(9); // SCALE
break;
}
RelDataType sqlType =
sqlType(typeFactory, dataType, precision, scale, typeString);
boolean nullable = resultSet.getInt(11) != DatabaseMetaData.columnNoNulls;
fieldInfo.add(columnName, sqlType).nullable(nullable);
}
resultSet.close();
return RelDataTypeImpl.proto(fieldInfo.build());
}
private static RelDataType sqlType(RelDataTypeFactory typeFactory, int dataType,
int precision, int scale, @Nullable String typeString) {
// Fall back to ANY if type is unknown
final SqlTypeName sqlTypeName =
Util.first(SqlTypeName.getNameForJdbcType(dataType), SqlTypeName.ANY);
switch (sqlTypeName) {
case ARRAY:
RelDataType component = null;
if (typeString != null && typeString.endsWith(" ARRAY")) {
// E.g. hsqldb gives "INTEGER ARRAY", so we deduce the component type
// "INTEGER".
final String remaining =
typeString.substring(0, typeString.length() - " ARRAY".length());
component = parseTypeString(typeFactory, remaining);
}
if (component == null) {
component =
typeFactory.createTypeWithNullability(
typeFactory.createSqlType(SqlTypeName.ANY), true);
}
return typeFactory.createArrayType(component, -1);
default:
break;
}
if (precision >= 0
&& scale >= 0
&& sqlTypeName.allowsPrecScale(true, true)) {
return typeFactory.createSqlType(sqlTypeName, precision, scale);
} else if (precision >= 0 && sqlTypeName.allowsPrecNoScale()) {
return typeFactory.createSqlType(sqlTypeName, precision);
} else {
assert sqlTypeName.allowsNoPrecNoScale();
return typeFactory.createSqlType(sqlTypeName);
}
}
/** Given "INTEGER", returns BasicSqlType(INTEGER).
* Given "VARCHAR(10)", returns BasicSqlType(VARCHAR, 10).
* Given "NUMERIC(10, 2)", returns BasicSqlType(NUMERIC, 10, 2). */
private static RelDataType parseTypeString(RelDataTypeFactory typeFactory,
String typeString) {
int precision = -1;
int scale = -1;
int open = typeString.indexOf("(");
if (open >= 0) {
int close = typeString.indexOf(")", open);
if (close >= 0) {
String rest = typeString.substring(open + 1, close);
typeString = typeString.substring(0, open);
int comma = rest.indexOf(",");
if (comma >= 0) {
precision = Integer.parseInt(rest.substring(0, comma));
scale = Integer.parseInt(rest.substring(comma));
} else {
precision = Integer.parseInt(rest);
}
}
}
try {
final SqlTypeName typeName = SqlTypeName.valueOf(typeString);
return typeName.allowsPrecScale(true, true)
? typeFactory.createSqlType(typeName, precision, scale)
: typeName.allowsPrecScale(true, false)
? typeFactory.createSqlType(typeName, precision)
: typeFactory.createSqlType(typeName);
} catch (IllegalArgumentException e) {
return typeFactory.createTypeWithNullability(
typeFactory.createSqlType(SqlTypeName.ANY), true);
}
}
@Override public Set<String> getTableNames() {
// This method is called during a cache refresh. We can take it as a signal
// that we need to re-build our own cache.
return getTableMap(!snapshot).keySet();
}
protected Map<String, RelProtoDataType> getTypes() {
// TODO: populate map from JDBC metadata
return ImmutableMap.of();
}
@Override public @Nullable RelProtoDataType getType(String name) {
return getTypes().get(name);
}
@Override public Set<String> getTypeNames() {
//noinspection RedundantCast
return (Set<String>) getTypes().keySet();
}
@Override public @Nullable Schema getSubSchema(String name) {
// JDBC does not support sub-schemas.
return null;
}
@Override public Set<String> getSubSchemaNames() {
return ImmutableSet.of();
}
@Override public <T extends Object> @Nullable T unwrap(Class<T> clazz) {
if (clazz.isInstance(this)) {
return clazz.cast(this);
}
if (clazz == DataSource.class) {
return clazz.cast(getDataSource());
}
return null;
}
private static void close(
@Nullable Connection connection,
@Nullable Statement statement,
@Nullable ResultSet resultSet) {
if (resultSet != null) {
try {
resultSet.close();
} catch (SQLException e) {
// ignore
}
}
if (statement != null) {
try {
statement.close();
} catch (SQLException e) {
// ignore
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
// ignore
}
}
}
/** Schema factory that creates a
* {@link org.apache.calcite.adapter.jdbc.JdbcSchema}.
*
* <p>This allows you to create a jdbc schema inside a model.json file, like
* this:
*
* <blockquote><pre>
* {
* "version": "1.0",
* "defaultSchema": "FOODMART_CLONE",
* "schemas": [
* {
* "name": "FOODMART_CLONE",
* "type": "custom",
* "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
* "operand": {
* "jdbcDriver": "com.mysql.jdbc.Driver",
* "jdbcUrl": "jdbc:mysql://localhost/foodmart",
* "jdbcUser": "foodmart",
* "jdbcPassword": "foodmart"
* }
* }
* ]
* }</pre></blockquote>
*/
public static class Factory implements SchemaFactory {
public static final Factory INSTANCE = new Factory();
private Factory() {}
@Override public Schema create(
SchemaPlus parentSchema,
String name,
Map<String, Object> operand) {
return JdbcSchema.create(parentSchema, name, operand);
}
}
/** Do not use. */
@Experimental
public interface Foo
extends BiFunction<@Nullable String, @Nullable String, Iterable<MetaImpl.MetaTable>> {
}
}