blob: b0f1b54c38ecae095cacc044a476a56208871a9d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.jdbc;
import static java.sql.JDBCType.BINARY;
import static java.sql.JDBCType.CHAR;
import static java.sql.JDBCType.LONGNVARCHAR;
import static java.sql.JDBCType.LONGVARBINARY;
import static java.sql.JDBCType.LONGVARCHAR;
import static java.sql.JDBCType.NCHAR;
import static java.sql.JDBCType.NUMERIC;
import static java.sql.JDBCType.NVARCHAR;
import static java.sql.JDBCType.VARBINARY;
import static java.sql.JDBCType.VARCHAR;
import static java.sql.JDBCType.valueOf;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import java.io.Serializable;
import java.sql.Array;
import java.sql.Date;
import java.sql.JDBCType;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.EnumMap;
import java.util.List;
import java.util.TimeZone;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;
import org.joda.time.LocalDate;
import org.joda.time.chrono.ISOChronology;
/** Provides utility functions for working with Beam {@link Schema} types. */
class SchemaUtil {
/**
* Interface implemented by functions that extract values of different types from a JDBC
* ResultSet.
*/
@FunctionalInterface
interface ResultSetFieldExtractor extends Serializable {
Object extract(ResultSet rs, Integer index) throws SQLException;
}
// ResultSetExtractors for primitive schema types (excluding arrays, structs and logical types).
private static final EnumMap<Schema.TypeName, ResultSetFieldExtractor>
RESULTSET_FIELD_EXTRACTORS =
new EnumMap<>(
ImmutableMap.<Schema.TypeName, ResultSetFieldExtractor>builder()
.put(Schema.TypeName.BOOLEAN, ResultSet::getBoolean)
.put(Schema.TypeName.BYTE, ResultSet::getByte)
.put(Schema.TypeName.BYTES, ResultSet::getBytes)
.put(Schema.TypeName.DATETIME, ResultSet::getTimestamp)
.put(Schema.TypeName.DECIMAL, ResultSet::getBigDecimal)
.put(Schema.TypeName.DOUBLE, ResultSet::getDouble)
.put(Schema.TypeName.FLOAT, ResultSet::getFloat)
.put(Schema.TypeName.INT16, ResultSet::getShort)
.put(Schema.TypeName.INT32, ResultSet::getInt)
.put(Schema.TypeName.INT64, ResultSet::getLong)
.put(Schema.TypeName.STRING, ResultSet::getString)
.build());
private static final ResultSetFieldExtractor DATE_EXTRACTOR = createDateExtractor();
private static final ResultSetFieldExtractor TIME_EXTRACTOR = createTimeExtractor();
private static final ResultSetFieldExtractor TIMESTAMP_EXTRACTOR = createTimestampExtractor();
/**
* Interface implemented by functions that create Beam {@link
* org.apache.beam.sdk.schemas.Schema.Field} corresponding to JDBC field metadata.
*/
@FunctionalInterface
interface BeamFieldConverter extends Serializable {
Schema.Field create(int index, ResultSetMetaData md) throws SQLException;
}
private static BeamFieldConverter jdbcTypeToBeamFieldConverter(JDBCType jdbcType) {
switch (jdbcType) {
case ARRAY:
return beamArrayField();
case BIGINT:
return beamFieldOfType(Schema.FieldType.INT64);
case BINARY:
return beamLogicalField(BINARY.getName(), LogicalTypes.FixedLengthBytes::of);
case BIT:
return beamFieldOfType(LogicalTypes.JDBC_BIT_TYPE);
case BOOLEAN:
return beamFieldOfType(Schema.FieldType.BOOLEAN);
case CHAR:
return beamLogicalField(CHAR.getName(), LogicalTypes.FixedLengthString::of);
case DATE:
return beamFieldOfType(LogicalTypes.JDBC_DATE_TYPE);
case DECIMAL:
return beamFieldOfType(Schema.FieldType.DECIMAL);
case DOUBLE:
return beamFieldOfType(Schema.FieldType.DOUBLE);
case FLOAT:
return beamFieldOfType(LogicalTypes.JDBC_FLOAT_TYPE);
case INTEGER:
return beamFieldOfType(Schema.FieldType.INT32);
case LONGNVARCHAR:
return beamLogicalField(LONGNVARCHAR.getName(), LogicalTypes.VariableLengthString::of);
case LONGVARBINARY:
return beamLogicalField(LONGVARBINARY.getName(), LogicalTypes.VariableLengthBytes::of);
case LONGVARCHAR:
return beamLogicalField(LONGVARCHAR.getName(), LogicalTypes.VariableLengthString::of);
case NCHAR:
return beamLogicalField(NCHAR.getName(), LogicalTypes.FixedLengthString::of);
case NUMERIC:
return beamLogicalNumericField(NUMERIC.getName());
case NVARCHAR:
return beamLogicalField(NVARCHAR.getName(), LogicalTypes.VariableLengthString::of);
case REAL:
return beamFieldOfType(Schema.FieldType.FLOAT);
case SMALLINT:
return beamFieldOfType(Schema.FieldType.INT16);
case TIME:
return beamFieldOfType(LogicalTypes.JDBC_TIME_TYPE);
case TIMESTAMP:
return beamFieldOfType(Schema.FieldType.DATETIME);
case TIMESTAMP_WITH_TIMEZONE:
return beamFieldOfType(LogicalTypes.JDBC_TIMESTAMP_WITH_TIMEZONE_TYPE);
case TINYINT:
return beamFieldOfType(Schema.FieldType.BYTE);
case VARBINARY:
return beamLogicalField(VARBINARY.getName(), LogicalTypes.VariableLengthBytes::of);
case VARCHAR:
return beamLogicalField(VARCHAR.getName(), LogicalTypes.VariableLengthString::of);
default:
throw new UnsupportedOperationException(
"Converting " + jdbcType + " to Beam schema type is not supported");
}
}
/** Infers the Beam {@link Schema} from {@link ResultSetMetaData}. */
static Schema toBeamSchema(ResultSetMetaData md) throws SQLException {
Schema.Builder schemaBuilder = Schema.builder();
for (int i = 1; i <= md.getColumnCount(); i++) {
JDBCType jdbcType = valueOf(md.getColumnType(i));
BeamFieldConverter fieldConverter = jdbcTypeToBeamFieldConverter(jdbcType);
schemaBuilder.addField(fieldConverter.create(i, md));
}
return schemaBuilder.build();
}
/** Converts a primitive JDBC field to corresponding Beam schema field. */
private static BeamFieldConverter beamFieldOfType(Schema.FieldType fieldType) {
return (index, md) -> {
String label = md.getColumnLabel(index);
return Schema.Field.of(label, fieldType)
.withNullable(md.isNullable(index) == ResultSetMetaData.columnNullable);
};
}
/** Converts logical types with arguments such as VARCHAR(25). */
private static <InputT, BaseT> BeamFieldConverter beamLogicalField(
String identifier,
BiFunction<String, Integer, Schema.LogicalType<InputT, BaseT>> constructor) {
return (index, md) -> {
int size = md.getPrecision(index);
Schema.FieldType fieldType =
Schema.FieldType.logicalType(constructor.apply(identifier, size));
return beamFieldOfType(fieldType).create(index, md);
};
}
/** Converts numeric fields with specified precision and scale. */
private static BeamFieldConverter beamLogicalNumericField(String identifier) {
return (index, md) -> {
int precision = md.getPrecision(index);
int scale = md.getScale(index);
Schema.FieldType fieldType =
Schema.FieldType.logicalType(
LogicalTypes.FixedPrecisionNumeric.of(identifier, precision, scale));
return beamFieldOfType(fieldType).create(index, md);
};
}
/** Converts array fields. */
private static BeamFieldConverter beamArrayField() {
return (index, md) -> {
JDBCType elementJdbcType = valueOf(md.getColumnTypeName(index));
BeamFieldConverter elementFieldConverter = jdbcTypeToBeamFieldConverter(elementJdbcType);
String label = md.getColumnLabel(index);
Schema.FieldType elementBeamType = elementFieldConverter.create(index, md).getType();
return Schema.Field.of(label, Schema.FieldType.array(elementBeamType))
.withNullable(md.isNullable(index) == ResultSetMetaData.columnNullable);
};
}
/** Creates a {@link ResultSetFieldExtractor} for the given type. */
private static ResultSetFieldExtractor createFieldExtractor(Schema.FieldType fieldType) {
Schema.TypeName typeName = fieldType.getTypeName();
switch (typeName) {
case ARRAY:
Schema.FieldType elementType = fieldType.getCollectionElementType();
ResultSetFieldExtractor elementExtractor = createFieldExtractor(elementType);
return createArrayExtractor(elementExtractor);
case DATETIME:
return TIMESTAMP_EXTRACTOR;
case LOGICAL_TYPE:
return createLogicalTypeExtractor(fieldType.getLogicalType());
default:
if (!RESULTSET_FIELD_EXTRACTORS.containsKey(typeName)) {
throw new UnsupportedOperationException(
"BeamRowMapper does not have support for fields of type " + fieldType.toString());
}
return RESULTSET_FIELD_EXTRACTORS.get(typeName);
}
}
/** Creates a {@link ResultSetFieldExtractor} for array types. */
private static ResultSetFieldExtractor createArrayExtractor(
ResultSetFieldExtractor elementExtractor) {
return (rs, index) -> {
Array arrayVal = rs.getArray(index);
if (arrayVal == null) {
return null;
}
List<Object> arrayElements = new ArrayList<>();
ResultSet arrayRs = arrayVal.getResultSet();
while (arrayRs.next()) {
arrayElements.add(elementExtractor.extract(arrayRs, 1));
}
return arrayElements;
};
}
/** Creates a {@link ResultSetFieldExtractor} for logical types. */
private static <InputT, BaseT> ResultSetFieldExtractor createLogicalTypeExtractor(
final Schema.LogicalType<InputT, BaseT> fieldType) {
String logicalTypeName = fieldType.getIdentifier();
JDBCType underlyingType = JDBCType.valueOf(logicalTypeName);
switch (underlyingType) {
case DATE:
return DATE_EXTRACTOR;
case TIME:
return TIME_EXTRACTOR;
case TIMESTAMP_WITH_TIMEZONE:
return TIMESTAMP_EXTRACTOR;
default:
ResultSetFieldExtractor extractor = createFieldExtractor(fieldType.getBaseType());
return (rs, index) -> fieldType.toInputType((BaseT) extractor.extract(rs, index));
}
}
/** Convert SQL date type to Beam DateTime. */
private static ResultSetFieldExtractor createDateExtractor() {
return (rs, i) -> {
Date date = rs.getDate(i, Calendar.getInstance(TimeZone.getTimeZone(ZoneOffset.UTC)));
if (date == null) {
return null;
}
ZonedDateTime zdt = ZonedDateTime.of(date.toLocalDate(), LocalTime.MIDNIGHT, ZoneOffset.UTC);
return new DateTime(zdt.toInstant().toEpochMilli(), ISOChronology.getInstanceUTC());
};
}
/** Convert SQL time type to Beam DateTime. */
private static ResultSetFieldExtractor createTimeExtractor() {
return (rs, i) -> {
Time time = rs.getTime(i, Calendar.getInstance(TimeZone.getTimeZone(ZoneOffset.UTC)));
if (time == null) {
return null;
}
return new DateTime(time.getTime(), ISOChronology.getInstanceUTC())
.withDate(new LocalDate(0L));
};
}
/** Convert SQL timestamp type to Beam DateTime. */
private static ResultSetFieldExtractor createTimestampExtractor() {
return (rs, i) -> {
Timestamp ts = rs.getTimestamp(i, Calendar.getInstance(TimeZone.getTimeZone(ZoneOffset.UTC)));
if (ts == null) {
return null;
}
return new DateTime(ts.toInstant().toEpochMilli(), ISOChronology.getInstanceUTC());
};
}
/**
* A {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper} implementation that converts JDBC
* results into Beam {@link Row} objects.
*/
static final class BeamRowMapper implements JdbcIO.RowMapper<Row> {
private final Schema schema;
private final List<ResultSetFieldExtractor> fieldExtractors;
public static BeamRowMapper of(Schema schema) {
List<ResultSetFieldExtractor> fieldExtractors =
IntStream.range(0, schema.getFieldCount())
.mapToObj(i -> createFieldExtractor(schema.getField(i).getType()))
.collect(Collectors.toList());
return new BeamRowMapper(schema, fieldExtractors);
}
private BeamRowMapper(Schema schema, List<ResultSetFieldExtractor> fieldExtractors) {
this.schema = schema;
this.fieldExtractors = fieldExtractors;
}
@Override
public Row mapRow(ResultSet rs) throws Exception {
Row.Builder rowBuilder = Row.withSchema(schema);
for (int i = 0; i < schema.getFieldCount(); i++) {
rowBuilder.addValue(fieldExtractors.get(i).extract(rs, i + 1));
}
return rowBuilder.build();
}
}
/**
* compares two fields. Does not compare nullability of field types.
*
* @param a field 1
* @param b field 2
* @return TRUE if fields are equal. Otherwise FALSE
*/
public static boolean compareSchemaField(Schema.Field a, Schema.Field b) {
if (!a.getName().equalsIgnoreCase(b.getName())) {
return false;
}
return compareSchemaFieldType(a.getType(), b.getType());
}
/**
* checks nullability for fields.
*
* @param fields
* @return TRUE if any field is not nullable
*/
public static boolean checkNullabilityForFields(List<Schema.Field> fields) {
return fields.stream().anyMatch(field -> !field.getType().getNullable());
}
/**
* compares two FieldType. Does not compare nullability.
*
* @param a FieldType 1
* @param b FieldType 2
* @return TRUE if FieldType are equal. Otherwise FALSE
*/
public static boolean compareSchemaFieldType(Schema.FieldType a, Schema.FieldType b) {
if (a.getTypeName().equals(b.getTypeName())) {
return !a.getTypeName().equals(Schema.TypeName.LOGICAL_TYPE)
|| compareSchemaFieldType(
a.getLogicalType().getBaseType(), b.getLogicalType().getBaseType());
} else if (a.getTypeName().isLogicalType()) {
return a.getLogicalType().getBaseType().getTypeName().equals(b.getTypeName());
} else if (b.getTypeName().isLogicalType()) {
return b.getLogicalType().getBaseType().getTypeName().equals(a.getTypeName());
}
return false;
}
static class FieldWithIndex implements Serializable {
private final Schema.Field field;
private final Integer index;
private FieldWithIndex(Schema.Field field, Integer index) {
this.field = field;
this.index = index;
}
public static FieldWithIndex of(Schema.Field field, Integer index) {
checkArgument(field != null);
checkArgument(index != null);
return new FieldWithIndex(field, index);
}
public Schema.Field getField() {
return field;
}
public Integer getIndex() {
return index;
}
}
}