blob: 995e0912c89b60edccc15db2c03706e65bc66eca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.databases.mysql.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeMapper;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
/** MySQLTypeMapper util class. */
@Internal
public class MySqlTypeMapper implements JdbcDialectTypeMapper {
private static final Logger LOG = LoggerFactory.getLogger(MySqlTypeMapper.class);
// ============================data types=====================
private static final String MYSQL_UNKNOWN = "UNKNOWN";
private static final String MYSQL_BIT = "BIT";
// -------------------------number----------------------------
private static final String MYSQL_TINYINT = "TINYINT";
private static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
private static final String MYSQL_SMALLINT = "SMALLINT";
private static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
private static final String MYSQL_MEDIUMINT = "MEDIUMINT";
private static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
private static final String MYSQL_INT = "INT";
private static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
private static final String MYSQL_INTEGER = "INTEGER";
private static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
private static final String MYSQL_BIGINT = "BIGINT";
private static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
private static final String MYSQL_DECIMAL = "DECIMAL";
private static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
private static final String MYSQL_FLOAT = "FLOAT";
private static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
private static final String MYSQL_DOUBLE = "DOUBLE";
private static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
// -------------------------string----------------------------
private static final String MYSQL_CHAR = "CHAR";
private static final String MYSQL_VARCHAR = "VARCHAR";
private static final String MYSQL_TINYTEXT = "TINYTEXT";
private static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
private static final String MYSQL_TEXT = "TEXT";
private static final String MYSQL_LONGTEXT = "LONGTEXT";
private static final String MYSQL_JSON = "JSON";
// ------------------------------time-------------------------
private static final String MYSQL_DATE = "DATE";
private static final String MYSQL_DATETIME = "DATETIME";
private static final String MYSQL_TIME = "TIME";
private static final String MYSQL_TIMESTAMP = "TIMESTAMP";
private static final String MYSQL_YEAR = "YEAR";
// ------------------------------blob-------------------------
private static final String MYSQL_TINYBLOB = "TINYBLOB";
private static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
private static final String MYSQL_BLOB = "BLOB";
private static final String MYSQL_LONGBLOB = "LONGBLOB";
private static final String MYSQL_BINARY = "BINARY";
private static final String MYSQL_VARBINARY = "VARBINARY";
private static final String MYSQL_GEOMETRY = "GEOMETRY";
private static final int RAW_TIME_LENGTH = 10;
private static final int RAW_TIMESTAMP_LENGTH = 19;
private final String databaseVersion;
private final String driverVersion;
public MySqlTypeMapper(String databaseVersion, String driverVersion) {
this.databaseVersion = databaseVersion;
this.driverVersion = driverVersion;
}
@Override
public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
throws SQLException {
String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
String columnName = metadata.getColumnName(colIndex);
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (mysqlType) {
case MYSQL_BIT:
return DataTypes.BOOLEAN();
case MYSQL_TINYBLOB:
case MYSQL_MEDIUMBLOB:
case MYSQL_BLOB:
case MYSQL_LONGBLOB:
case MYSQL_VARBINARY:
case MYSQL_BINARY:
// BINARY is not supported in MySqlDialect now.
// VARBINARY(n) is not supported in MySqlDialect when 'n' is not equals to
// Integer.MAX_VALUE. Please see
// org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect#supportedTypes and
// org.apache.flink.connector.jdbc.dialect.AbstractDialect#validate for more
// details.
return DataTypes.BYTES();
case MYSQL_TINYINT:
return DataTypes.TINYINT();
case MYSQL_TINYINT_UNSIGNED:
case MYSQL_SMALLINT:
return DataTypes.SMALLINT();
case MYSQL_SMALLINT_UNSIGNED:
case MYSQL_MEDIUMINT:
case MYSQL_MEDIUMINT_UNSIGNED:
case MYSQL_INT:
case MYSQL_INTEGER:
return DataTypes.INT();
case MYSQL_INT_UNSIGNED:
case MYSQL_INTEGER_UNSIGNED:
case MYSQL_BIGINT:
return DataTypes.BIGINT();
case MYSQL_BIGINT_UNSIGNED:
return DataTypes.DECIMAL(20, 0);
case MYSQL_DECIMAL:
return DataTypes.DECIMAL(precision, scale);
case MYSQL_DECIMAL_UNSIGNED:
checkMaxPrecision(tablePath, columnName, precision);
return DataTypes.DECIMAL(precision + 1, scale);
case MYSQL_FLOAT:
return DataTypes.FLOAT();
case MYSQL_FLOAT_UNSIGNED:
LOG.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED);
return DataTypes.FLOAT();
case MYSQL_DOUBLE:
return DataTypes.DOUBLE();
case MYSQL_DOUBLE_UNSIGNED:
LOG.warn("{} will probably cause value overflow.", MYSQL_DOUBLE_UNSIGNED);
return DataTypes.DOUBLE();
case MYSQL_CHAR:
return DataTypes.CHAR(precision);
case MYSQL_VARCHAR:
case MYSQL_TINYTEXT:
case MYSQL_MEDIUMTEXT:
case MYSQL_TEXT:
return DataTypes.VARCHAR(precision);
case MYSQL_JSON:
return DataTypes.STRING();
case MYSQL_LONGTEXT:
LOG.warn(
"Type '{}' has a maximum precision of 536870911 in MySQL. "
+ "Due to limitations in the Flink type system, "
+ "the precision will be set to 2147483647.",
MYSQL_LONGTEXT);
return DataTypes.STRING();
case MYSQL_DATE:
return DataTypes.DATE();
case MYSQL_TIME:
return isExplicitPrecision(precision, RAW_TIME_LENGTH)
? DataTypes.TIME(precision - RAW_TIME_LENGTH - 1)
: DataTypes.TIME(0);
case MYSQL_DATETIME:
case MYSQL_TIMESTAMP:
boolean explicitPrecision = isExplicitPrecision(precision, RAW_TIMESTAMP_LENGTH);
if (explicitPrecision) {
int p = precision - RAW_TIMESTAMP_LENGTH - 1;
if (p <= 6 && p >= 0) {
return DataTypes.TIMESTAMP(p);
}
return p > 6 ? DataTypes.TIMESTAMP(6) : DataTypes.TIMESTAMP(0);
}
return DataTypes.TIMESTAMP(0);
case MYSQL_YEAR:
case MYSQL_GEOMETRY:
case MYSQL_UNKNOWN:
default:
final String jdbcColumnName = metadata.getColumnName(colIndex);
throw new UnsupportedOperationException(
String.format(
"Doesn't support MySQL type '%s' on column '%s' in MySQL version %s, driver version %s yet.",
mysqlType, jdbcColumnName, databaseVersion, driverVersion));
}
}
private boolean isExplicitPrecision(int precision, int defaultPrecision) {
return precision > defaultPrecision && precision - defaultPrecision - 1 <= 9;
}
private void checkMaxPrecision(ObjectPath tablePath, String columnName, int precision) {
if (precision >= DecimalType.MAX_PRECISION) {
throw new CatalogException(
String.format(
"Precision %s of table %s column name %s in type %s exceeds "
+ "DecimalType.MAX_PRECISION %s.",
precision,
tablePath.getFullName(),
columnName,
MYSQL_DECIMAL_UNSIGNED,
DecimalType.MAX_PRECISION));
}
}
}