blob: b476e661361be62950b97758963326b920c2805f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.databases.oracle.dialect;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import oracle.jdbc.internal.OracleBlob;
import oracle.jdbc.internal.OracleClob;
import oracle.sql.BINARY_DOUBLE;
import oracle.sql.BINARY_FLOAT;
import oracle.sql.CHAR;
import oracle.sql.DATE;
import oracle.sql.NUMBER;
import oracle.sql.RAW;
import oracle.sql.TIMESTAMP;
import oracle.sql.TIMESTAMPTZ;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.ZonedDateTime;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
*/
@Internal
public class OracleRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
public OracleRowConverter(RowType rowType) {
super(rowType);
}
@Override
public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return val -> null;
case BOOLEAN:
return val -> val instanceof NUMBER ? ((NUMBER) val).booleanValue() : val;
case FLOAT:
return val ->
val instanceof NUMBER
? ((NUMBER) val).floatValue()
: val instanceof BINARY_FLOAT
? ((BINARY_FLOAT) val).floatValue()
: val instanceof BigDecimal
? ((BigDecimal) val).floatValue()
: val;
case DOUBLE:
return val ->
val instanceof NUMBER
? ((NUMBER) val).doubleValue()
: val instanceof BINARY_DOUBLE
? ((BINARY_DOUBLE) val).doubleValue()
: val instanceof BigDecimal
? ((BigDecimal) val).doubleValue()
: val;
case TINYINT:
return val ->
val instanceof NUMBER
? ((NUMBER) val).byteValue()
: val instanceof BigDecimal ? ((BigDecimal) val).byteValue() : val;
case SMALLINT:
return val ->
val instanceof NUMBER
? ((NUMBER) val).shortValue()
: val instanceof BigDecimal ? ((BigDecimal) val).shortValue() : val;
case INTEGER:
return val ->
val instanceof NUMBER
? ((NUMBER) val).intValue()
: val instanceof BigDecimal ? ((BigDecimal) val).intValue() : val;
case BIGINT:
return val ->
val instanceof NUMBER
? ((NUMBER) val).longValue()
: val instanceof BigDecimal ? ((BigDecimal) val).longValue() : val;
case DECIMAL:
final int precision = ((DecimalType) type).getPrecision();
final int scale = ((DecimalType) type).getScale();
return val ->
val instanceof BigInteger
? DecimalData.fromBigDecimal(
new BigDecimal((BigInteger) val, 0), precision, scale)
: DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
case CHAR:
case VARCHAR:
return val ->
(val instanceof CHAR)
? StringData.fromString(((CHAR) val).getString())
: (val instanceof OracleClob)
? StringData.fromString(((OracleClob) val).stringValue())
: StringData.fromString((String) val);
case BINARY:
case VARBINARY:
case RAW:
return val ->
val instanceof RAW
? ((RAW) val).getBytes()
: val instanceof OracleBlob
? ((OracleBlob) val)
.getBytes(1, (int) ((OracleBlob) val).length())
: val.toString().getBytes();
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
return val -> val instanceof NUMBER ? ((NUMBER) val).intValue() : val;
case DATE:
return val ->
val instanceof DATE
? (int) (((DATE) val).dateValue().toLocalDate().toEpochDay())
: val instanceof Timestamp
? (int)
(((Timestamp) val)
.toLocalDateTime()
.toLocalDate()
.toEpochDay())
: (int) (((Date) val).toLocalDate().toEpochDay());
case TIME_WITHOUT_TIME_ZONE:
return val ->
val instanceof DATE
? (int)
(((DATE) val).timeValue().toLocalTime().toNanoOfDay()
/ 1_000_000L)
: (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
case TIMESTAMP_WITHOUT_TIME_ZONE:
return val ->
val instanceof TIMESTAMP
? TimestampData.fromTimestamp(((TIMESTAMP) val).timestampValue())
: TimestampData.fromTimestamp((Timestamp) val);
case TIMESTAMP_WITH_TIME_ZONE:
return val -> {
if (val instanceof TIMESTAMPTZ) {
final TIMESTAMPTZ ts = (TIMESTAMPTZ) val;
final ZonedDateTime zdt =
ZonedDateTime.ofInstant(
ts.timestampValue().toInstant(),
ts.getTimeZone().toZoneId());
return TimestampData.fromLocalDateTime(zdt.toLocalDateTime());
} else {
return TimestampData.fromTimestamp((Timestamp) val);
}
};
case ARRAY:
case ROW:
case MAP:
case MULTISET:
default:
return super.createInternalConverter(type);
}
}
@Override
public String converterName() {
return "Oracle";
}
}