blob: d2c76c3608c9be6a3a5dfadb93b4799e701bf1a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.jdbc;
import java.sql.Date;
import java.sql.JDBCType;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;
/** Provides utility functions for working with {@link JdbcIO}. */
public class JdbcUtil {
/** Generates an insert statement based on {@Link Schema.Field}. * */
public static String generateStatement(String tableName, List<Schema.Field> fields) {
String fieldNames =
IntStream.range(0, fields.size())
.mapToObj(
(index) -> {
return fields.get(index).getName();
})
.collect(Collectors.joining(", "));
String valuePlaceholder =
IntStream.range(0, fields.size())
.mapToObj(
(index) -> {
return "?";
})
.collect(Collectors.joining(", "));
return String.format("INSERT INTO %s(%s) VALUES(%s)", tableName, fieldNames, valuePlaceholder);
}
/** PreparedStatementSetCaller for Schema Field types. * */
private static Map<Schema.TypeName, JdbcIO.PreparedStatementSetCaller> typeNamePsSetCallerMap =
new EnumMap<Schema.TypeName, JdbcIO.PreparedStatementSetCaller>(
ImmutableMap.<Schema.TypeName, JdbcIO.PreparedStatementSetCaller>builder()
.put(
Schema.TypeName.BYTE,
(element, ps, i, fieldWithIndex) ->
ps.setByte(i + 1, element.getByte(fieldWithIndex.getIndex())))
.put(
Schema.TypeName.INT16,
(element, ps, i, fieldWithIndex) ->
ps.setInt(i + 1, element.getInt16(fieldWithIndex.getIndex())))
.put(
Schema.TypeName.INT64,
(element, ps, i, fieldWithIndex) ->
ps.setLong(i + 1, element.getInt64(fieldWithIndex.getIndex())))
.put(
Schema.TypeName.DECIMAL,
(element, ps, i, fieldWithIndex) ->
ps.setBigDecimal(i + 1, element.getDecimal(fieldWithIndex.getIndex())))
.put(
Schema.TypeName.FLOAT,
(element, ps, i, fieldWithIndex) ->
ps.setFloat(i + 1, element.getFloat(fieldWithIndex.getIndex())))
.put(
Schema.TypeName.DOUBLE,
(element, ps, i, fieldWithIndex) ->
ps.setDouble(i + 1, element.getDouble(fieldWithIndex.getIndex())))
.put(
Schema.TypeName.DATETIME,
(element, ps, i, fieldWithIndex) ->
ps.setTimestamp(
i + 1,
new Timestamp(
element.getDateTime(fieldWithIndex.getIndex()).getMillis())))
.put(
Schema.TypeName.BOOLEAN,
(element, ps, i, fieldWithIndex) ->
ps.setBoolean(i + 1, element.getBoolean(fieldWithIndex.getIndex())))
.put(Schema.TypeName.BYTES, createBytesCaller())
.put(
Schema.TypeName.INT32,
(element, ps, i, fieldWithIndex) ->
ps.setInt(i + 1, element.getInt32(fieldWithIndex.getIndex())))
.put(Schema.TypeName.STRING, createStringCaller())
.build());
/** PreparedStatementSetCaller for Schema Field Logical types. * */
public static JdbcIO.PreparedStatementSetCaller getPreparedStatementSetCaller(
Schema.FieldType fieldType) {
switch (fieldType.getTypeName()) {
case ARRAY:
case ITERABLE:
return (element, ps, i, fieldWithIndex) -> {
ps.setArray(
i + 1,
ps.getConnection()
.createArrayOf(
fieldType.getCollectionElementType().getTypeName().name(),
element.getArray(fieldWithIndex.getIndex()).toArray()));
};
case LOGICAL_TYPE:
{
String logicalTypeName = fieldType.getLogicalType().getIdentifier();
JDBCType jdbcType = JDBCType.valueOf(logicalTypeName);
switch (jdbcType) {
case DATE:
return (element, ps, i, fieldWithIndex) -> {
ps.setDate(
i + 1,
new Date(
getDateOrTimeOnly(
element.getDateTime(fieldWithIndex.getIndex()).toDateTime(), true)
.getTime()
.getTime()));
};
case TIME:
return (element, ps, i, fieldWithIndex) -> {
ps.setTime(
i + 1,
new Time(
getDateOrTimeOnly(
element.getDateTime(fieldWithIndex.getIndex()).toDateTime(), false)
.getTime()
.getTime()));
};
case TIMESTAMP_WITH_TIMEZONE:
return (element, ps, i, fieldWithIndex) -> {
Calendar calendar =
withTimestampAndTimezone(
element.getDateTime(fieldWithIndex.getIndex()).toDateTime());
ps.setTimestamp(i + 1, new Timestamp(calendar.getTime().getTime()), calendar);
};
default:
return getPreparedStatementSetCaller(fieldType.getLogicalType().getBaseType());
}
}
default:
{
if (typeNamePsSetCallerMap.containsKey(fieldType.getTypeName())) {
return typeNamePsSetCallerMap.get(fieldType.getTypeName());
} else {
throw new RuntimeException(
fieldType.getTypeName().name()
+ " in schema is not supported while writing. Please provide statement and preparedStatementSetter");
}
}
}
}
private static JdbcIO.PreparedStatementSetCaller createBytesCaller() {
return (element, ps, i, fieldWithIndex) -> {
validateLogicalTypeLength(
fieldWithIndex.getField(), element.getBytes(fieldWithIndex.getIndex()).length);
ps.setBytes(i + 1, element.getBytes(fieldWithIndex.getIndex()));
};
}
private static JdbcIO.PreparedStatementSetCaller createStringCaller() {
return (element, ps, i, fieldWithIndex) -> {
validateLogicalTypeLength(
fieldWithIndex.getField(), element.getString(fieldWithIndex.getIndex()).length());
ps.setString(i + 1, element.getString(fieldWithIndex.getIndex()));
};
}
private static void validateLogicalTypeLength(Schema.Field field, Integer length) {
try {
if (field.getType().getTypeName().isLogicalType()
&& field.getType().getLogicalType().getArgument() != null) {
int maxLimit = (Integer) field.getType().getLogicalType().getArgument();
if (length >= maxLimit) {
throw new RuntimeException(
String.format(
"Length of Schema.Field[%s] data exceeds database column capacity",
field.getName()));
}
}
} catch (NumberFormatException e) {
// if argument is not set or not integer then do nothing and proceed with the insertion
}
}
private static Calendar getDateOrTimeOnly(DateTime dateTime, boolean wantDateOnly) {
Calendar cal = Calendar.getInstance();
cal.setTimeZone(TimeZone.getTimeZone(dateTime.getZone().getID()));
if (wantDateOnly) { // return date only
cal.set(Calendar.YEAR, dateTime.getYear());
cal.set(Calendar.MONTH, dateTime.getMonthOfYear() - 1);
cal.set(Calendar.DATE, dateTime.getDayOfMonth());
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
} else { // return time only
cal.set(Calendar.YEAR, 1970);
cal.set(Calendar.MONTH, Calendar.JANUARY);
cal.set(Calendar.DATE, 1);
cal.set(Calendar.HOUR_OF_DAY, dateTime.getHourOfDay());
cal.set(Calendar.MINUTE, dateTime.getMinuteOfHour());
cal.set(Calendar.SECOND, dateTime.getSecondOfMinute());
cal.set(Calendar.MILLISECOND, dateTime.getMillisOfSecond());
}
return cal;
}
private static Calendar withTimestampAndTimezone(DateTime dateTime) {
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone(dateTime.getZone().getID()));
calendar.setTimeInMillis(dateTime.getMillis());
return calendar;
}
}