blob: f66a143323e52a4bf80d939bcb582286e2637c70 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.jdbc;
import static org.apache.beam.sdk.io.jdbc.JdbcUtil.JDBC_DRIVER_MAP;
import static org.apache.beam.sdk.io.jdbc.JdbcUtil.registerJdbcDriver;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import java.util.ServiceLoader;
import javax.sql.DataSource;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class JdbcWriteSchemaTransformProviderTest {
private static final JdbcIO.DataSourceConfiguration DATA_SOURCE_CONFIGURATION =
JdbcIO.DataSourceConfiguration.create(
"org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:testDB;create=true");
private static final DataSource DATA_SOURCE = DATA_SOURCE_CONFIGURATION.buildDatasource();
private String writeTableName;
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@BeforeClass
public static void beforeClass() throws Exception {
// by default, derby uses a lock timeout of 60 seconds. In order to speed up the test
// and detect the lock faster, we decrease this timeout
System.setProperty("derby.locks.waitTimeout", "2");
System.setProperty("derby.stream.error.file", "build/derby.log");
registerJdbcDriver(
ImmutableMap.of(
"derby", Objects.requireNonNull(DATA_SOURCE_CONFIGURATION.getDriverClassName()).get()));
}
@Before
public void before() throws SQLException {
writeTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
DatabaseTestHelper.createTable(DATA_SOURCE, writeTableName);
}
@Test
public void testInvalidWriteSchemaOptions() {
assertThrows(
IllegalArgumentException.class,
() -> {
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
.setDriverClassName("")
.setJdbcUrl("")
.build()
.validate();
});
assertThrows(
IllegalArgumentException.class,
() -> {
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
.setDriverClassName("ClassName")
.setJdbcUrl("JdbcUrl")
.setLocation("Location")
.setWriteStatement("WriteStatement")
.build()
.validate();
});
assertThrows(
IllegalArgumentException.class,
() -> {
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
.setDriverClassName("ClassName")
.setJdbcUrl("JdbcUrl")
.build()
.validate();
});
assertThrows(
IllegalArgumentException.class,
() -> {
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
.setJdbcUrl("JdbcUrl")
.setLocation("Location")
.setJdbcType("invalidType")
.build()
.validate();
});
assertThrows(
IllegalArgumentException.class,
() -> {
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
.setJdbcUrl("JdbcUrl")
.setLocation("Location")
.build()
.validate();
});
assertThrows(
IllegalArgumentException.class,
() -> {
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
.setJdbcUrl("JdbcUrl")
.setLocation("Location")
.setDriverClassName("ClassName")
.setJdbcType((String) JDBC_DRIVER_MAP.keySet().toArray()[0])
.build()
.validate();
});
}
@Test
public void testValidWriteSchemaOptions() {
for (String jdbcType : JDBC_DRIVER_MAP.keySet()) {
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
.setJdbcUrl("JdbcUrl")
.setLocation("Location")
.setJdbcType(jdbcType)
.build()
.validate();
}
}
@Test
public void testWriteToTable() throws SQLException {
JdbcWriteSchemaTransformProvider provider = null;
for (SchemaTransformProvider p : ServiceLoader.load(SchemaTransformProvider.class)) {
if (p instanceof JdbcWriteSchemaTransformProvider) {
provider = (JdbcWriteSchemaTransformProvider) p;
break;
}
}
assertNotNull(provider);
Schema schema =
Schema.of(
Schema.Field.of("id", Schema.FieldType.INT64),
Schema.Field.of("name", Schema.FieldType.STRING));
List<Row> rows =
ImmutableList.of(
Row.withSchema(schema).attachValues(1L, "name1"),
Row.withSchema(schema).attachValues(2L, "name2"));
PCollectionRowTuple.of("input", pipeline.apply(Create.of(rows).withRowSchema(schema)))
.apply(
provider.from(
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
.setDriverClassName(DATA_SOURCE_CONFIGURATION.getDriverClassName().get())
.setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
.setLocation(writeTableName)
.build()));
pipeline.run();
DatabaseTestHelper.assertRowCount(DATA_SOURCE, writeTableName, 2);
}
@Test
public void testWriteToTableWithJdbcTypeSpecified() throws SQLException {
JdbcWriteSchemaTransformProvider provider = null;
for (SchemaTransformProvider p : ServiceLoader.load(SchemaTransformProvider.class)) {
if (p instanceof JdbcWriteSchemaTransformProvider) {
provider = (JdbcWriteSchemaTransformProvider) p;
break;
}
}
assertNotNull(provider);
Schema schema =
Schema.of(
Schema.Field.of("id", Schema.FieldType.INT64),
Schema.Field.of("name", Schema.FieldType.STRING));
List<Row> rows =
ImmutableList.of(
Row.withSchema(schema).attachValues(1L, "name1"),
Row.withSchema(schema).attachValues(2L, "name2"));
PCollectionRowTuple.of("input", pipeline.apply(Create.of(rows).withRowSchema(schema)))
.apply(
provider.from(
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
.setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
.setJdbcType("derby")
.setLocation(writeTableName)
.build()));
pipeline.run();
DatabaseTestHelper.assertRowCount(DATA_SOURCE, writeTableName, 2);
}
}