blob: 3273f4b7591c21f54f89c15778471d6686f9bd56 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.jdbc.client;
import org.apache.doris.analysis.DefaultValueExprDef;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.util.Util;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
public class JdbcMySQLClient extends JdbcClient {
private boolean convertDateToNull = false;
private boolean isDoris = false;
protected JdbcMySQLClient(JdbcClientConfig jdbcClientConfig) {
super(jdbcClientConfig);
convertDateToNull = isConvertDatetimeToNull(jdbcClientConfig);
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
conn = super.getConnection();
stmt = conn.createStatement();
rs = stmt.executeQuery("SHOW VARIABLES LIKE 'version_comment'");
if (rs.next()) {
String versionComment = rs.getString("Value");
isDoris = versionComment.toLowerCase().contains("doris");
}
} catch (SQLException e) {
throw new JdbcClientException("Failed to determine MySQL Version Comment", e);
} finally {
close(rs, stmt, conn);
}
}
@Override
protected String getDatabaseQuery() {
return "SHOW DATABASES";
}
@Override
protected List<String> getSpecifiedDatabase(Connection conn) {
List<String> databaseNames = Lists.newArrayList();
try {
databaseNames.add(conn.getCatalog());
} catch (SQLException e) {
throw new JdbcClientException("failed to get specified database name from jdbc", e);
} finally {
close(conn);
}
return databaseNames;
}
@Override
protected void processTable(String dbName, String tableName, String[] tableTypes,
Consumer<ResultSet> resultSetConsumer) {
Connection conn = null;
ResultSet rs = null;
try {
conn = super.getConnection();
DatabaseMetaData databaseMetaData = conn.getMetaData();
rs = databaseMetaData.getTables(dbName, null, tableName, tableTypes);
resultSetConsumer.accept(rs);
} catch (SQLException e) {
throw new JdbcClientException("Failed to process table", e);
} finally {
close(rs, conn);
}
}
@Override
protected String[] getTableTypes() {
return new String[] {"TABLE", "VIEW", "SYSTEM VIEW"};
}
@Override
protected ResultSet getColumns(DatabaseMetaData databaseMetaData, String catalogName, String schemaName,
String tableName) throws SQLException {
return databaseMetaData.getColumns(schemaName, null, tableName, null);
}
/**
* get all columns of one table
*/
@Override
public List<JdbcFieldSchema> getJdbcColumnsInfo(String dbName, String tableName) {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = com.google.common.collect.Lists.newArrayList();
String finalDbName = getRealDatabaseName(dbName);
String finalTableName = getRealTableName(dbName, tableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);
rs = getColumns(databaseMetaData, catalogName, finalDbName, finalTableName);
List<String> primaryKeys = getPrimaryKeys(databaseMetaData, catalogName, dbName, tableName);
Map<String, String> mapFieldtoType = null;
while (rs.next()) {
lowerColumnToRealColumn.putIfAbsent(finalDbName, new ConcurrentHashMap<>());
lowerColumnToRealColumn.get(finalDbName).putIfAbsent(finalTableName, new ConcurrentHashMap<>());
JdbcFieldSchema field = new JdbcFieldSchema();
String columnName = rs.getString("COLUMN_NAME");
if (isLowerCaseTableNames) {
lowerColumnToRealColumn.get(finalDbName).get(finalTableName)
.put(columnName.toLowerCase(), columnName);
columnName = columnName.toLowerCase();
} else {
lowerColumnToRealColumn.get(finalDbName).get(finalTableName).put(columnName, columnName);
}
field.setColumnName(columnName);
field.setDataType(rs.getInt("DATA_TYPE"));
// in mysql-jdbc-connector-8.0.*, TYPE_NAME of the HLL column in doris will be "UNKNOWN"
// in mysql-jdbc-connector-5.1.*, TYPE_NAME of the HLL column in doris will be "HLL"
// in mysql-jdbc-connector-8.0.*, TYPE_NAME of BITMAP column in doris will be "BIT"
// in mysql-jdbc-connector-5.1.*, TYPE_NAME of BITMAP column in doris will be "BITMAP"
field.setDataTypeName(rs.getString("TYPE_NAME"));
if (isDoris) {
mapFieldtoType = getColumnsDataTypeUseQuery(dbName, tableName);
field.setDataTypeName(mapFieldtoType.get(rs.getString("COLUMN_NAME")));
}
field.setKey(primaryKeys.contains(field.getColumnName()));
field.setColumnSize(rs.getInt("COLUMN_SIZE"));
field.setDecimalDigits(rs.getInt("DECIMAL_DIGITS"));
field.setNumPrecRadix(rs.getInt("NUM_PREC_RADIX"));
/*
Whether it is allowed to be NULL
0 (columnNoNulls)
1 (columnNullable)
2 (columnNullableUnknown)
*/
field.setAllowNull(rs.getInt("NULLABLE") != 0);
field.setRemarks(rs.getString("REMARKS"));
field.setCharOctetLength(rs.getInt("CHAR_OCTET_LENGTH"));
String isAutoincrement = rs.getString("IS_AUTOINCREMENT");
field.setAutoincrement("YES".equalsIgnoreCase(isAutoincrement));
field.setDefaultValue(rs.getString("COLUMN_DEF"));
tableSchema.add(field);
}
} catch (SQLException e) {
throw new JdbcClientException("failed to get table name list from jdbc for table %s:%s", e, tableName,
Util.getRootCauseMessage(e));
} finally {
close(rs, conn);
}
return tableSchema;
}
@Override
public List<Column> getColumnsFromJdbc(String dbName, String tableName) {
List<JdbcFieldSchema> jdbcTableSchema = getJdbcColumnsInfo(dbName, tableName);
List<Column> dorisTableSchema = Lists.newArrayListWithCapacity(jdbcTableSchema.size());
for (JdbcFieldSchema field : jdbcTableSchema) {
DefaultValueExprDef defaultValueExprDef = null;
if (field.getDefaultValue() != null) {
String colDefaultValue = field.getDefaultValue().toLowerCase();
// current_timestamp()
if (colDefaultValue.startsWith("current_timestamp")) {
long precision = 0;
if (colDefaultValue.contains("(")) {
String substring = colDefaultValue.substring(18, colDefaultValue.length() - 1).trim();
precision = substring.isEmpty() ? 0 : Long.parseLong(substring);
}
defaultValueExprDef = new DefaultValueExprDef("now", precision);
}
}
dorisTableSchema.add(new Column(field.getColumnName(),
jdbcTypeToDoris(field), field.isKey(), null,
field.isAllowNull(), field.isAutoincrement(), field.getDefaultValue(), field.getRemarks(),
true, defaultValueExprDef, -1, null));
}
return dorisTableSchema;
}
protected List<String> getPrimaryKeys(DatabaseMetaData databaseMetaData, String catalogName,
String dbName, String tableName) throws SQLException {
ResultSet rs = null;
List<String> primaryKeys = Lists.newArrayList();
rs = databaseMetaData.getPrimaryKeys(dbName, null, tableName);
while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
primaryKeys.add(columnName);
}
rs.close();
return primaryKeys;
}
@Override
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
// For Doris type
if (isDoris) {
return dorisTypeToDoris(fieldSchema.getDataTypeName().toUpperCase());
}
// For mysql type: "INT UNSIGNED":
// fieldSchema.getDataTypeName().split(" ")[0] == "INT"
// fieldSchema.getDataTypeName().split(" ")[1] == "UNSIGNED"
String[] typeFields = fieldSchema.getDataTypeName().split(" ");
String mysqlType = typeFields[0];
// For unsigned int, should extend the type.
if (typeFields.length > 1 && "UNSIGNED".equals(typeFields[1])) {
switch (mysqlType) {
case "TINYINT":
return Type.SMALLINT;
case "SMALLINT":
case "MEDIUMINT":
return Type.INT;
case "INT":
return Type.BIGINT;
case "BIGINT":
return Type.LARGEINT;
case "DECIMAL": {
int precision = fieldSchema.getColumnSize() + 1;
int scale = fieldSchema.getDecimalDigits();
return createDecimalOrStringType(precision, scale);
}
case "DOUBLE":
// As of MySQL 8.0.17, the UNSIGNED attribute is deprecated
// for columns of type FLOAT, DOUBLE, and DECIMAL (and any synonyms)
// https://dev.mysql.com/doc/refman/8.0/en/numeric-type-syntax.html
// The maximum value may cause errors due to insufficient accuracy
return Type.DOUBLE;
case "FLOAT":
return Type.FLOAT;
default:
throw new JdbcClientException("Unknown UNSIGNED type of mysql, type: [" + mysqlType + "]");
}
}
switch (mysqlType) {
case "BOOLEAN":
return Type.BOOLEAN;
case "TINYINT":
return Type.TINYINT;
case "SMALLINT":
case "YEAR":
return Type.SMALLINT;
case "MEDIUMINT":
case "INT":
return Type.INT;
case "BIGINT":
return Type.BIGINT;
case "DATE":
if (convertDateToNull) {
fieldSchema.setAllowNull(true);
}
return ScalarType.createDateV2Type();
case "TIMESTAMP":
case "DATETIME": {
// mysql can support microsecond
// use columnSize to calculate the precision of timestamp/datetime
int columnSize = fieldSchema.getColumnSize();
int scale = columnSize > 19 ? columnSize - 20 : 0;
if (scale > 6) {
scale = 6;
}
if (convertDateToNull) {
fieldSchema.setAllowNull(true);
}
return ScalarType.createDatetimeV2Type(scale);
}
case "FLOAT":
return Type.FLOAT;
case "DOUBLE":
return Type.DOUBLE;
case "DECIMAL": {
int precision = fieldSchema.getColumnSize();
int scale = fieldSchema.getDecimalDigits();
return createDecimalOrStringType(precision, scale);
}
case "CHAR":
ScalarType charType = ScalarType.createType(PrimitiveType.CHAR);
charType.setLength(fieldSchema.columnSize);
return charType;
case "VARCHAR":
return ScalarType.createVarcharType(fieldSchema.columnSize);
case "BIT":
if (fieldSchema.getColumnSize() == 1) {
return Type.BOOLEAN;
} else {
return ScalarType.createStringType();
}
case "JSON":
return ScalarType.createJsonbType();
case "TIME":
case "TINYTEXT":
case "TEXT":
case "MEDIUMTEXT":
case "LONGTEXT":
case "TINYBLOB":
case "BLOB":
case "MEDIUMBLOB":
case "LONGBLOB":
case "STRING":
case "SET":
case "BINARY":
case "VARBINARY":
case "ENUM":
return ScalarType.createStringType();
default:
return Type.UNSUPPORTED;
}
}
private boolean isConvertDatetimeToNull(JdbcClientConfig jdbcClientConfig) {
// Check if the JDBC URL contains "zeroDateTimeBehavior=convertToNull".
return jdbcClientConfig.getJdbcUrl().contains("zeroDateTimeBehavior=convertToNull");
}
/**
* get all columns like DatabaseMetaData.getColumns in mysql-jdbc-connector
*/
private Map<String, String> getColumnsDataTypeUseQuery(String dbName, String tableName) {
Connection conn = getConnection();
ResultSet resultSet = null;
Map<String, String> fieldtoType = Maps.newHashMap();
StringBuilder queryBuf = new StringBuilder("SHOW FULL COLUMNS FROM ");
queryBuf.append(tableName);
queryBuf.append(" FROM ");
queryBuf.append(dbName);
try (Statement stmt = conn.createStatement()) {
resultSet = stmt.executeQuery(queryBuf.toString());
while (resultSet.next()) {
// get column name
String fieldName = resultSet.getString("Field");
// get original type name
String typeName = resultSet.getString("Type");
fieldtoType.put(fieldName, typeName);
}
} catch (SQLException e) {
throw new JdbcClientException("failed to get column list from jdbc for table %s:%s", tableName,
Util.getRootCauseMessage(e));
} finally {
close(resultSet, conn);
}
return fieldtoType;
}
private Type dorisTypeToDoris(String type) {
if (type == null || type.isEmpty()) {
return Type.UNSUPPORTED;
}
String upperType = type.toUpperCase();
// For ARRAY type
if (upperType.startsWith("ARRAY")) {
String innerType = upperType.substring(6, upperType.length() - 1).trim();
Type arrayInnerType = dorisTypeToDoris(innerType);
return ArrayType.create(arrayInnerType, true);
}
int openParen = upperType.indexOf("(");
String baseType = (openParen == -1) ? upperType : upperType.substring(0, openParen);
switch (baseType) {
case "BOOL":
case "BOOLEAN":
return Type.BOOLEAN;
case "TINYINT":
return Type.TINYINT;
case "INT":
return Type.INT;
case "SMALLINT":
return Type.SMALLINT;
case "BIGINT":
return Type.BIGINT;
case "LARGEINT":
return Type.LARGEINT;
case "FLOAT":
return Type.FLOAT;
case "DOUBLE":
return Type.DOUBLE;
case "DECIMAL":
case "DECIMALV3": {
String[] params = upperType.substring(openParen + 1, upperType.length() - 1).split(",");
int precision = Integer.parseInt(params[0].trim());
int scale = Integer.parseInt(params[1].trim());
return createDecimalOrStringType(precision, scale);
}
case "DATE":
case "DATEV2":
return ScalarType.createDateV2Type();
case "DATETIME":
case "DATETIMEV2": {
int scale = (openParen == -1) ? 6
: Integer.parseInt(upperType.substring(openParen + 1, upperType.length() - 1));
if (scale > 6) {
scale = 6;
}
return ScalarType.createDatetimeV2Type(scale);
}
case "CHAR":
case "VARCHAR": {
int length = Integer.parseInt(upperType.substring(openParen + 1, upperType.length() - 1));
return baseType.equals("CHAR")
? ScalarType.createCharType(length) : ScalarType.createVarcharType(length);
}
case "STRING":
case "TEXT":
return ScalarType.createStringType();
case "JSON":
return ScalarType.createJsonbType();
case "HLL":
return ScalarType.createHllType();
case "BITMAP":
return Type.BITMAP;
default:
return Type.UNSUPPORTED;
}
}
}