blob: f213128a4061b575bffbef6d0dba731bdbbb5103 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.databases.postgres.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeMapper;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
/** PostgresTypeMapper util class. */
@Internal
public class PostgresTypeMapper implements JdbcDialectTypeMapper {
private static final Logger LOG = LoggerFactory.getLogger(PostgresTypeMapper.class);
// Postgres jdbc driver maps several alias to real type, we use real type rather than alias:
// serial2 <=> int2
// smallserial <=> int2
// serial4 <=> serial
// serial8 <=> bigserial
// smallint <=> int2
// integer <=> int4
// int <=> int4
// bigint <=> int8
// float <=> float8
// boolean <=> bool
// decimal <=> numeric
private static final String PG_SMALLSERIAL = "smallserial";
protected static final String PG_SERIAL = "serial";
protected static final String PG_BIGSERIAL = "bigserial";
private static final String PG_BYTEA = "bytea";
private static final String PG_BYTEA_ARRAY = "_bytea";
private static final String PG_SMALLINT = "int2";
private static final String PG_SMALLINT_ARRAY = "_int2";
private static final String PG_INTEGER = "int4";
private static final String PG_INTEGER_ARRAY = "_int4";
private static final String PG_BIGINT = "int8";
private static final String PG_BIGINT_ARRAY = "_int8";
private static final String PG_REAL = "float4";
private static final String PG_REAL_ARRAY = "_float4";
private static final String PG_DOUBLE_PRECISION = "float8";
private static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
private static final String PG_NUMERIC = "numeric";
private static final String PG_NUMERIC_ARRAY = "_numeric";
private static final String PG_BOOLEAN = "bool";
private static final String PG_BOOLEAN_ARRAY = "_bool";
private static final String PG_TIMESTAMP = "timestamp";
private static final String PG_TIMESTAMP_ARRAY = "_timestamp";
private static final String PG_TIMESTAMPTZ = "timestamptz";
private static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz";
private static final String PG_DATE = "date";
private static final String PG_DATE_ARRAY = "_date";
private static final String PG_TIME = "time";
private static final String PG_TIME_ARRAY = "_time";
private static final String PG_TEXT = "text";
private static final String PG_TEXT_ARRAY = "_text";
private static final String PG_CHAR = "bpchar";
private static final String PG_CHAR_ARRAY = "_bpchar";
private static final String PG_CHARACTER = "character";
private static final String PG_CHARACTER_ARRAY = "_character";
private static final String PG_CHARACTER_VARYING = "varchar";
private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
@Override
public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
throws SQLException {
String pgType = metadata.getColumnTypeName(colIndex);
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
DataType dataType = getMapping(pgType, precision, scale);
if (dataType == null) {
throw new UnsupportedOperationException(
String.format("Doesn't support %s type '%s' yet", getDBType(), pgType));
}
return dataType;
}
protected DataType getMapping(String pgType, int precision, int scale) {
switch (pgType) {
case PG_BOOLEAN:
return DataTypes.BOOLEAN();
case PG_BOOLEAN_ARRAY:
return DataTypes.ARRAY(DataTypes.BOOLEAN());
case PG_BYTEA:
return DataTypes.BYTES();
case PG_BYTEA_ARRAY:
return DataTypes.ARRAY(DataTypes.BYTES());
case PG_SMALLINT:
case PG_SMALLSERIAL:
return DataTypes.SMALLINT();
case PG_SMALLINT_ARRAY:
return DataTypes.ARRAY(DataTypes.SMALLINT());
case PG_INTEGER:
case PG_SERIAL:
return DataTypes.INT();
case PG_INTEGER_ARRAY:
return DataTypes.ARRAY(DataTypes.INT());
case PG_BIGINT:
case PG_BIGSERIAL:
return DataTypes.BIGINT();
case PG_BIGINT_ARRAY:
return DataTypes.ARRAY(DataTypes.BIGINT());
case PG_REAL:
return DataTypes.FLOAT();
case PG_REAL_ARRAY:
return DataTypes.ARRAY(DataTypes.FLOAT());
case PG_DOUBLE_PRECISION:
return DataTypes.DOUBLE();
case PG_DOUBLE_PRECISION_ARRAY:
return DataTypes.ARRAY(DataTypes.DOUBLE());
case PG_NUMERIC:
// see SPARK-26538: handle numeric without explicit precision and scale.
if (precision > 0) {
return DataTypes.DECIMAL(precision, scale);
}
return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18);
case PG_NUMERIC_ARRAY:
// see SPARK-26538: handle numeric without explicit precision and scale.
if (precision > 0) {
return DataTypes.ARRAY(DataTypes.DECIMAL(precision, scale));
}
return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18));
case PG_CHAR:
case PG_CHARACTER:
return DataTypes.CHAR(precision);
case PG_CHAR_ARRAY:
case PG_CHARACTER_ARRAY:
return DataTypes.ARRAY(DataTypes.CHAR(precision));
case PG_CHARACTER_VARYING:
return DataTypes.VARCHAR(precision);
case PG_CHARACTER_VARYING_ARRAY:
return DataTypes.ARRAY(DataTypes.VARCHAR(precision));
case PG_TEXT:
return DataTypes.STRING();
case PG_TEXT_ARRAY:
return DataTypes.ARRAY(DataTypes.STRING());
case PG_TIMESTAMP:
return DataTypes.TIMESTAMP(scale);
case PG_TIMESTAMP_ARRAY:
return DataTypes.ARRAY(DataTypes.TIMESTAMP(scale));
case PG_TIMESTAMPTZ:
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale);
case PG_TIMESTAMPTZ_ARRAY:
return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale));
case PG_TIME:
return DataTypes.TIME(scale);
case PG_TIME_ARRAY:
return DataTypes.ARRAY(DataTypes.TIME(scale));
case PG_DATE:
return DataTypes.DATE();
case PG_DATE_ARRAY:
return DataTypes.ARRAY(DataTypes.DATE());
default:
return null;
}
}
protected String getDBType() {
return "Postgres";
}
}