blob: a8ec68679822abbf2cb9c1e3b67e306b2f813210 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.kudu.table;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.KuduTestBase;
import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.shaded.com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class KuduTableFactoryTest extends KuduTestBase {
private StreamTableEnvironment tableEnv;
private String kuduMasters;
@BeforeEach
public void init() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerStreamingMode(env);
kuduMasters = harness.getMasterAddressesAsString();
}
@Test
public void testMissingMasters() throws Exception {
tableEnv.executeSql("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTable11')");
assertThrows(TableException.class,
() -> tableEnv.executeSql("INSERT INTO TestTable11 values ('f', 1)"));
}
@Test
public void testNonExistingTable() throws Exception {
tableEnv.executeSql("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTable11', 'kudu.masters'='" + kuduMasters + "')");
JobClient jobClient = tableEnv.executeSql("INSERT INTO TestTable11 values ('f', 1)").getJobClient().get();
try {
jobClient.getJobExecutionResult().get();
fail();
} catch (ExecutionException ee) {
assertTrue(ee.getCause() instanceof JobExecutionException);
}
}
@Test
public void testCreateTable() throws Exception {
tableEnv.executeSql("CREATE TABLE TestTable11 (`first` STRING, `second` STRING) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTable11', 'kudu.masters'='" + kuduMasters + "', " +
"'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
tableEnv.executeSql("INSERT INTO TestTable11 values ('f', 's')")
.getJobClient()
.get()
.getJobExecutionResult()
.get(1, TimeUnit.MINUTES);
validateSingleKey("TestTable11");
}
@Test
public void testTimestamp() throws Exception {
// Timestamp should be bridged to sql.Timestamp
// Test it when creating the table...
tableEnv.executeSql("CREATE TABLE TestTableTs (`first` STRING, `second` TIMESTAMP(3)) " +
"WITH ('connector.type'='kudu', 'kudu.masters'='" + kuduMasters + "', " +
"'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
tableEnv.executeSql("INSERT INTO TestTableTs values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')")
.getJobClient()
.get()
.getJobExecutionResult()
.get(1, TimeUnit.MINUTES);
tableEnv.executeSql("INSERT INTO TestTableTs values ('s', TIMESTAMP '2020-02-02 23:23:23')")
.getJobClient()
.get()
.getJobExecutionResult()
.get(1, TimeUnit.MINUTES);
KuduTable kuduTable = harness.getClient().openTable("TestTableTs");
assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType());
KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
HashSet<Timestamp> results = new HashSet<>();
scanner.forEach(sc -> results.add(sc.getTimestamp("second")));
assertEquals(2, results.size());
List<Timestamp> expected = Lists.newArrayList(
Timestamp.valueOf("2020-01-01 12:12:12.123"),
Timestamp.valueOf("2020-02-02 23:23:23"));
assertEquals(new HashSet<>(expected), results);
}
@Test
public void testExistingTable() throws Exception {
// Creating a table
tableEnv.executeSql("CREATE TABLE TestTable12 (`first` STRING, `second` STRING) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTable12', 'kudu.masters'='" + kuduMasters + "', " +
"'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
tableEnv.executeSql("INSERT INTO TestTable12 values ('f', 's')")
.getJobClient()
.get()
.getJobExecutionResult()
.get(1, TimeUnit.MINUTES);
// Then another one in SQL that refers to the previously created one
tableEnv.executeSql("CREATE TABLE TestTable12b (`first` STRING, `second` STRING) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTable12', 'kudu.masters'='" + kuduMasters + "')");
tableEnv.executeSql("INSERT INTO TestTable12b values ('f2','s2')")
.getJobClient()
.get()
.getJobExecutionResult()
.get(1, TimeUnit.MINUTES);
// Validate that both insertions were into the same table
KuduTable kuduTable = harness.getClient().openTable("TestTable12");
KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
List<RowResult> rows = new ArrayList<>();
scanner.forEach(rows::add);
assertEquals(2, rows.size());
assertEquals("f", rows.get(0).getString("first"));
assertEquals("s", rows.get(0).getString("second"));
assertEquals("f2", rows.get(1).getString("first"));
assertEquals("s2", rows.get(1).getString("second"));
}
@Test
public void testTableSink() {
final TableSchema schema = TableSchema.builder()
.field("first", DataTypes.STRING())
.field("second", DataTypes.STRING())
.build();
final Map<String, String> properties = new HashMap<>();
properties.put("connector.type", "kudu");
properties.put("kudu.masters", kuduMasters);
properties.put("kudu.table", "TestTable12");
properties.put("kudu.ignore-not-found", "true");
properties.put("kudu.ignore-duplicate", "true");
properties.put("kudu.flush-interval", "10000");
properties.put("kudu.max-buffer-size", "10000");
KuduWriterConfig.Builder builder = KuduWriterConfig.Builder.setMasters(kuduMasters)
.setFlushInterval(10000)
.setMaxBufferSize(10000)
.setIgnoreDuplicate(true)
.setIgnoreNotFound(true);
KuduTableInfo kuduTableInfo = KuduTableInfo.forTable("TestTable12");
KuduTableSink expected = new KuduTableSink(builder, kuduTableInfo, schema);
final TableSink<?> actualSink = TableFactoryService.find(TableSinkFactory.class, properties)
.createTableSink(ObjectPath.fromString("default.TestTable12"), new CatalogTableImpl(schema, properties, null));
assertEquals(expected, actualSink);
}
}