blob: c14b9dc916867431b1f95af7f09db1f9e3f045c0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.jdbc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.Array;
import java.sql.JDBCType;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.ZoneId;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.DateTime;
import org.joda.time.LocalDate;
import org.joda.time.chrono.ISOChronology;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Test SchemaUtils. */
@RunWith(JUnit4.class)
public class SchemaUtilTest {
@Test
public void testToBeamSchema() throws SQLException {
ResultSetMetaData mockResultSetMetaData = mock(ResultSetMetaData.class);
ImmutableList<JdbcFieldInfo> fieldInfo =
ImmutableList.of(
JdbcFieldInfo.of("int_array_col", Types.ARRAY, JDBCType.INTEGER.getName(), false),
JdbcFieldInfo.of("bigint_col", Types.BIGINT),
JdbcFieldInfo.of("binary_col", Types.BINARY, 255),
JdbcFieldInfo.of("bit_col", Types.BIT),
JdbcFieldInfo.of("boolean_col", Types.BOOLEAN),
JdbcFieldInfo.of("char_col", Types.CHAR, 255),
JdbcFieldInfo.of("date_col", Types.DATE),
JdbcFieldInfo.of("decimal_col", Types.DECIMAL),
JdbcFieldInfo.of("double_col", Types.DOUBLE),
JdbcFieldInfo.of("float_col", Types.FLOAT),
JdbcFieldInfo.of("integer_col", Types.INTEGER),
JdbcFieldInfo.of("longnvarchar_col", Types.LONGNVARCHAR, 1024),
JdbcFieldInfo.of("longvarchar_col", Types.LONGVARCHAR, 1024),
JdbcFieldInfo.of("longvarbinary_col", Types.LONGVARBINARY, 1024),
JdbcFieldInfo.of("nchar_col", Types.NCHAR, 255),
JdbcFieldInfo.of("numeric_col", Types.NUMERIC, 12, 4),
JdbcFieldInfo.of("nvarchar_col", Types.NVARCHAR, 255),
JdbcFieldInfo.of("real_col", Types.REAL),
JdbcFieldInfo.of("smallint_col", Types.SMALLINT),
JdbcFieldInfo.of("time_col", Types.TIME),
JdbcFieldInfo.of("timestamp_col", Types.TIMESTAMP),
JdbcFieldInfo.of("timestamptz_col", Types.TIMESTAMP_WITH_TIMEZONE),
JdbcFieldInfo.of("tinyint_col", Types.TINYINT),
JdbcFieldInfo.of("varbinary_col", Types.VARBINARY, 255),
JdbcFieldInfo.of("varchar_col", Types.VARCHAR, 255));
when(mockResultSetMetaData.getColumnCount()).thenReturn(fieldInfo.size());
for (int i = 0; i < fieldInfo.size(); i++) {
JdbcFieldInfo f = fieldInfo.get(i);
when(mockResultSetMetaData.getColumnLabel(eq(i + 1))).thenReturn(f.columnLabel);
when(mockResultSetMetaData.getColumnType(eq(i + 1))).thenReturn(f.columnType);
when(mockResultSetMetaData.getColumnTypeName(eq(i + 1))).thenReturn(f.columnTypeName);
when(mockResultSetMetaData.getPrecision(eq(i + 1))).thenReturn(f.precision);
when(mockResultSetMetaData.getScale(eq(i + 1))).thenReturn(f.scale);
when(mockResultSetMetaData.isNullable(eq(i + 1)))
.thenReturn(
f.nullable ? ResultSetMetaData.columnNullable : ResultSetMetaData.columnNoNulls);
}
Schema wantBeamSchema =
Schema.builder()
.addArrayField("int_array_col", Schema.FieldType.INT32)
.addField("bigint_col", Schema.FieldType.INT64)
.addField("binary_col", LogicalTypes.fixedLengthBytes(JDBCType.BINARY, 255))
.addField("bit_col", LogicalTypes.JDBC_BIT_TYPE)
.addField("boolean_col", Schema.FieldType.BOOLEAN)
.addField("char_col", LogicalTypes.fixedLengthString(JDBCType.CHAR, 255))
.addField("date_col", LogicalTypes.JDBC_DATE_TYPE)
.addField("decimal_col", Schema.FieldType.DECIMAL)
.addField("double_col", Schema.FieldType.DOUBLE)
.addField("float_col", LogicalTypes.JDBC_FLOAT_TYPE)
.addField("integer_col", Schema.FieldType.INT32)
.addField(
"longnvarchar_col", LogicalTypes.variableLengthString(JDBCType.LONGNVARCHAR, 1024))
.addField(
"longvarchar_col", LogicalTypes.variableLengthString(JDBCType.LONGVARCHAR, 1024))
.addField(
"longvarbinary_col", LogicalTypes.variableLengthBytes(JDBCType.LONGVARBINARY, 1024))
.addField("nchar_col", LogicalTypes.fixedLengthString(JDBCType.NCHAR, 255))
.addField("numeric_col", LogicalTypes.numeric(12, 4))
.addField("nvarchar_col", LogicalTypes.variableLengthString(JDBCType.NVARCHAR, 255))
.addField("real_col", Schema.FieldType.FLOAT)
.addField("smallint_col", Schema.FieldType.INT16)
.addField("time_col", LogicalTypes.JDBC_TIME_TYPE)
.addField("timestamp_col", Schema.FieldType.DATETIME)
.addField("timestamptz_col", LogicalTypes.JDBC_TIMESTAMP_WITH_TIMEZONE_TYPE)
.addField("tinyint_col", Schema.FieldType.BYTE)
.addField("varbinary_col", LogicalTypes.variableLengthBytes(JDBCType.VARBINARY, 255))
.addField("varchar_col", LogicalTypes.variableLengthString(JDBCType.VARCHAR, 255))
.build();
Schema haveBeamSchema = SchemaUtil.toBeamSchema(mockResultSetMetaData);
assertEquals(wantBeamSchema, haveBeamSchema);
}
@Test
public void testBeamRowMapperArray() throws Exception {
ResultSet mockArrayElementsResultSet = mock(ResultSet.class);
when(mockArrayElementsResultSet.next()).thenReturn(true, true, true, false);
when(mockArrayElementsResultSet.getInt(eq(1))).thenReturn(10, 20, 30);
Array mockArray = mock(Array.class);
when(mockArray.getResultSet()).thenReturn(mockArrayElementsResultSet);
ResultSet mockResultSet = mock(ResultSet.class);
when(mockResultSet.getArray(eq(1))).thenReturn(mockArray);
Schema wantSchema =
Schema.builder().addField("array", Schema.FieldType.array(Schema.FieldType.INT32)).build();
Row wantRow =
Row.withSchema(wantSchema).addValues((Object) ImmutableList.of(10, 20, 30)).build();
SchemaUtil.BeamRowMapper beamRowMapper = SchemaUtil.BeamRowMapper.of(wantSchema);
Row haveRow = beamRowMapper.mapRow(mockResultSet);
assertEquals(wantRow, haveRow);
}
@Test
public void testBeamRowMapperPrimitiveTypes() throws Exception {
ResultSet mockResultSet = mock(ResultSet.class);
AtomicBoolean isNull = new AtomicBoolean(false);
when(mockResultSet.wasNull())
.thenAnswer(
x -> {
boolean val = isNull.get();
isNull.set(false);
return val;
});
when(mockResultSet.getLong(eq(1))).thenReturn(42L);
when(mockResultSet.getBytes(eq(2))).thenReturn("binary".getBytes(Charset.forName("UTF-8")));
when(mockResultSet.getBoolean(eq(3))).thenReturn(true);
when(mockResultSet.getBoolean(eq(4))).thenReturn(false);
when(mockResultSet.getString(eq(5))).thenReturn("char");
when(mockResultSet.getBigDecimal(eq(6))).thenReturn(BigDecimal.valueOf(25L));
when(mockResultSet.getDouble(eq(7))).thenReturn(20.5D);
when(mockResultSet.getFloat(eq(8))).thenReturn(15.5F);
when(mockResultSet.getInt(eq(9))).thenReturn(10);
when(mockResultSet.getString(eq(10))).thenReturn("longvarchar");
when(mockResultSet.getBytes(eq(11)))
.thenReturn("longvarbinary".getBytes(Charset.forName("UTF-8")));
when(mockResultSet.getBigDecimal(eq(12))).thenReturn(BigDecimal.valueOf(1000L));
when(mockResultSet.getFloat(eq(13))).thenReturn(32F);
when(mockResultSet.getShort(eq(14))).thenReturn((short) 8);
when(mockResultSet.getShort(eq(15))).thenReturn((short) 4);
when(mockResultSet.getBytes(eq(16))).thenReturn("varbinary".getBytes(Charset.forName("UTF-8")));
when(mockResultSet.getString(eq(17))).thenReturn("varchar");
when(mockResultSet.getBoolean(eq(18)))
.thenAnswer(
x -> {
isNull.set(true);
return false;
});
when(mockResultSet.getInt(eq(19)))
.thenAnswer(
x -> {
isNull.set(true);
return 0;
});
Schema wantSchema =
Schema.builder()
.addField("bigint_col", Schema.FieldType.INT64)
.addField("binary_col", Schema.FieldType.BYTES)
.addField("bit_col", Schema.FieldType.BOOLEAN)
.addField("boolean_col", Schema.FieldType.BOOLEAN)
.addField("char_col", Schema.FieldType.STRING)
.addField("decimal_col", Schema.FieldType.DECIMAL)
.addField("double_col", Schema.FieldType.DOUBLE)
.addField("float_col", Schema.FieldType.FLOAT)
.addField("integer_col", Schema.FieldType.INT32)
.addField("longvarchar_col", Schema.FieldType.STRING)
.addField("longvarbinary_col", Schema.FieldType.BYTES)
.addField("numeric_col", Schema.FieldType.DECIMAL)
.addField("real_col", Schema.FieldType.FLOAT)
.addField("smallint_col", Schema.FieldType.INT16)
.addField("tinyint_col", Schema.FieldType.INT16)
.addField("varbinary_col", Schema.FieldType.BYTES)
.addField("varchar_col", Schema.FieldType.STRING)
.addField("nullable_boolean_col", Schema.FieldType.BOOLEAN.withNullable(true))
.addField("another_int_col", Schema.FieldType.INT32.withNullable(true))
.build();
Row wantRow =
Row.withSchema(wantSchema)
.addValues(
42L,
"binary".getBytes(Charset.forName("UTF-8")),
true,
false,
"char",
BigDecimal.valueOf(25L),
20.5D,
15.5F,
10,
"longvarchar",
"longvarbinary".getBytes(Charset.forName("UTF-8")),
BigDecimal.valueOf(1000L),
32F,
(short) 8,
(short) 4,
"varbinary".getBytes(Charset.forName("UTF-8")),
"varchar",
null,
null)
.build();
SchemaUtil.BeamRowMapper beamRowMapper = SchemaUtil.BeamRowMapper.of(wantSchema);
Row haveRow = beamRowMapper.mapRow(mockResultSet);
assertEquals(wantRow, haveRow);
}
@Test
public void testJdbcLogicalTypesMapValidAvroSchemaIT() {
String expectedAvroSchema =
"{"
+ " \"type\": \"record\","
+ " \"name\": \"topLevelRecord\","
+ " \"fields\": [{"
+ " \"name\": \"longvarchar_col\","
+ " \"type\": {"
+ " \"type\": \"string\","
+ " \"logicalType\": \"varchar\","
+ " \"maxLength\": 50"
+ " }"
+ " }, {"
+ " \"name\": \"varchar_col\","
+ " \"type\": {"
+ " \"type\": \"string\","
+ " \"logicalType\": \"varchar\","
+ " \"maxLength\": 15"
+ " }"
+ " }, {"
+ " \"name\": \"fixedlength_char_col\","
+ " \"type\": {"
+ " \"type\": \"string\","
+ " \"logicalType\": \"char\","
+ " \"maxLength\": 25"
+ " }"
+ " }, {"
+ " \"name\": \"date_col\","
+ " \"type\": {"
+ " \"type\": \"int\","
+ " \"logicalType\": \"date\""
+ " }"
+ " }, {"
+ " \"name\": \"time_col\","
+ " \"type\": {"
+ " \"type\": \"int\","
+ " \"logicalType\": \"time-millis\""
+ " }"
+ " }]"
+ "}";
Schema jdbcRowSchema =
Schema.builder()
.addField(
"longvarchar_col", LogicalTypes.variableLengthString(JDBCType.LONGVARCHAR, 50))
.addField("varchar_col", LogicalTypes.variableLengthString(JDBCType.VARCHAR, 15))
.addField("fixedlength_char_col", LogicalTypes.fixedLengthString(JDBCType.CHAR, 25))
.addField("date_col", LogicalTypes.JDBC_DATE_TYPE)
.addField("time_col", LogicalTypes.JDBC_TIME_TYPE)
.build();
System.out.println(AvroUtils.toAvroSchema(jdbcRowSchema));
assertEquals(
new org.apache.avro.Schema.Parser().parse(expectedAvroSchema),
AvroUtils.toAvroSchema(jdbcRowSchema));
}
@Test
public void testBeamRowMapperDateTime() throws Exception {
long epochMilli = 1558719710000L;
ResultSet mockResultSet = mock(ResultSet.class);
when(mockResultSet.getObject(eq(1), eq(java.time.LocalDate.class)))
.thenReturn(
java.time.Instant.ofEpochMilli(epochMilli)
.atZone(ZoneId.systemDefault())
.toLocalDate());
when(mockResultSet.getTime(eq(2), any())).thenReturn(new Time(epochMilli));
when(mockResultSet.getTimestamp(eq(3), any())).thenReturn(new Timestamp(epochMilli));
when(mockResultSet.getTimestamp(eq(4), any())).thenReturn(new Timestamp(epochMilli));
Schema wantSchema =
Schema.builder()
.addField("date_col", LogicalTypes.JDBC_DATE_TYPE)
.addField("time_col", LogicalTypes.JDBC_TIME_TYPE)
.addField("timestamptz_col", LogicalTypes.JDBC_TIMESTAMP_WITH_TIMEZONE_TYPE)
.addField("timestamp_col", Schema.FieldType.DATETIME)
.build();
DateTime wantDateTime = new DateTime(epochMilli, ISOChronology.getInstanceUTC());
Row wantRow =
Row.withSchema(wantSchema)
.addValues(
wantDateTime.withTimeAtStartOfDay(),
wantDateTime.withDate(new LocalDate(0L)),
wantDateTime,
wantDateTime)
.build();
SchemaUtil.BeamRowMapper beamRowMapper = SchemaUtil.BeamRowMapper.of(wantSchema);
Row haveRow = beamRowMapper.mapRow(mockResultSet);
assertEquals(wantRow, haveRow);
}
////////////////////////////////////////////////////////////////////////////////////////
private static final class JdbcFieldInfo {
private final String columnLabel;
private final int columnType;
private final String columnTypeName;
private final boolean nullable;
private final int precision;
private final int scale;
private JdbcFieldInfo(
String columnLabel,
int columnType,
String columnTypeName,
boolean nullable,
int precision,
int scale) {
this.columnLabel = columnLabel;
this.columnType = columnType;
this.columnTypeName = columnTypeName;
this.nullable = nullable;
this.precision = precision;
this.scale = scale;
}
private static JdbcFieldInfo of(
String columnLabel, int columnType, String columnTypeName, boolean nullable) {
return new JdbcFieldInfo(columnLabel, columnType, columnTypeName, nullable, 0, 0);
}
@SuppressWarnings("unused")
private static JdbcFieldInfo of(String columnLabel, int columnType, boolean nullable) {
return new JdbcFieldInfo(columnLabel, columnType, null, nullable, 0, 0);
}
private static JdbcFieldInfo of(String columnLabel, int columnType) {
return new JdbcFieldInfo(columnLabel, columnType, null, false, 0, 0);
}
private static JdbcFieldInfo of(String columnLabel, int columnType, int precision) {
return new JdbcFieldInfo(columnLabel, columnType, null, false, precision, 0);
}
private static JdbcFieldInfo of(String columnLabel, int columnType, int precision, int scale) {
return new JdbcFieldInfo(columnLabel, columnType, null, false, precision, scale);
}
}
@Test
public void testSchemaFieldComparator() {
assertTrue(
SchemaUtil.compareSchemaField(
Schema.Field.of("name", Schema.FieldType.STRING),
Schema.Field.of("name", Schema.FieldType.STRING)));
assertFalse(
SchemaUtil.compareSchemaField(
Schema.Field.of("name", Schema.FieldType.STRING),
Schema.Field.of("anotherName", Schema.FieldType.STRING)));
assertFalse(
SchemaUtil.compareSchemaField(
Schema.Field.of("name", Schema.FieldType.STRING),
Schema.Field.of("name", Schema.FieldType.INT64)));
}
@Test
public void testSchemaFieldTypeComparator() {
assertTrue(SchemaUtil.compareSchemaFieldType(Schema.FieldType.STRING, Schema.FieldType.STRING));
assertFalse(SchemaUtil.compareSchemaFieldType(Schema.FieldType.STRING, Schema.FieldType.INT16));
assertTrue(
SchemaUtil.compareSchemaFieldType(
LogicalTypes.variableLengthString(JDBCType.VARCHAR, 255),
LogicalTypes.variableLengthString(JDBCType.VARCHAR, 255)));
assertFalse(
SchemaUtil.compareSchemaFieldType(
LogicalTypes.variableLengthString(JDBCType.VARCHAR, 255),
LogicalTypes.fixedLengthBytes(JDBCType.BIT, 255)));
assertTrue(
SchemaUtil.compareSchemaFieldType(
Schema.FieldType.STRING, LogicalTypes.variableLengthString(JDBCType.VARCHAR, 255)));
assertFalse(
SchemaUtil.compareSchemaFieldType(
Schema.FieldType.INT16, LogicalTypes.variableLengthString(JDBCType.VARCHAR, 255)));
assertTrue(
SchemaUtil.compareSchemaFieldType(
LogicalTypes.variableLengthString(JDBCType.VARCHAR, 255), Schema.FieldType.STRING));
}
}