blob: 398ba9f4a1a5153ee5e65ae415825667325046d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.kudu.table;
import org.apache.flink.connectors.kudu.connector.KuduTestBase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.shaded.com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class KuduTableFactoryTest extends KuduTestBase {
private StreamTableEnvironment tableEnv;
private String kuduMasters;
@BeforeEach
public void init() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerBatchMode(env);
kuduMasters = harness.getMasterAddressesAsString();
}
@Test
public void testMissingTable() throws Exception {
tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
"WITH ('connector.type'='kudu', 'kudu.masters'='" + kuduMasters + "')");
assertThrows(NullPointerException.class,
() -> tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)"));
}
@Test
public void testMissingMasters() throws Exception {
tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTable11')");
assertThrows(NullPointerException.class,
() -> tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)"));
}
@Test
public void testNonExistingTable() throws Exception {
tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTable11', 'kudu.masters'='" + kuduMasters + "')");
tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)");
assertThrows(java.util.concurrent.ExecutionException.class,
() -> tableEnv.execute("test"));
}
@Test
public void testCreateTable() throws Exception {
tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` STRING) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTable11', 'kudu.masters'='" + kuduMasters + "', " +
"'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 's')");
tableEnv.execute("test");
validateSingleKey("TestTable11");
}
@Test
public void testTimestamp() throws Exception {
// Timestamp should be bridged to sql.Timestamp
// Test it when creating the table...
tableEnv.sqlUpdate("CREATE TABLE TestTableTs (`first` STRING, `second` TIMESTAMP(3)) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTableTs', 'kudu.masters'='" + kuduMasters + "', " +
"'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
tableEnv.sqlUpdate("INSERT INTO TestTableTs values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')");
tableEnv.execute("test");
// And also when inserting into existing table
tableEnv.sqlUpdate("CREATE TABLE TestTableTsE (`first` STRING, `second` TIMESTAMP(3)) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTableTs', 'kudu.masters'='" + kuduMasters + "')");
tableEnv.sqlUpdate("INSERT INTO TestTableTsE values ('s', TIMESTAMP '2020-02-02 23:23:23')");
tableEnv.execute("test");
KuduTable kuduTable = harness.getClient().openTable("TestTableTs");
assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType());
KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
HashSet<Timestamp> results = new HashSet<>();
scanner.forEach(sc -> results.add(sc.getTimestamp("second")));
assertEquals(2, results.size());
List<Timestamp> expected = Lists.newArrayList(
Timestamp.valueOf("2020-01-01 12:12:12.123"),
Timestamp.valueOf("2020-02-02 23:23:23"));
assertEquals(new HashSet<>(expected), results);
}
@Test
public void testExistingTable() throws Exception {
// Creating a table
tableEnv.sqlUpdate("CREATE TABLE TestTable12 (`first` STRING, `second` STRING) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTable12', 'kudu.masters'='" + kuduMasters + "', " +
"'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
tableEnv.sqlUpdate("INSERT INTO TestTable12 values ('f', 's')");
tableEnv.execute("test");
// Then another one in SQL that refers to the previously created one
tableEnv.sqlUpdate("CREATE TABLE TestTable12b (`first` STRING, `second` STRING) " +
"WITH ('connector.type'='kudu', 'kudu.table'='TestTable12', 'kudu.masters'='" + kuduMasters + "')");
tableEnv.sqlUpdate("INSERT INTO TestTable12b values ('f2','s2')");
tableEnv.execute("test2");
// Validate that both insertions were into the same table
KuduTable kuduTable = harness.getClient().openTable("TestTable12");
KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
List<RowResult> rows = new ArrayList<>();
scanner.forEach(rows::add);
assertEquals(2, rows.size());
assertEquals("f", rows.get(0).getString("first"));
assertEquals("s", rows.get(0).getString("second"));
assertEquals("f2", rows.get(1).getString("first"));
assertEquals("s2", rows.get(1).getString("second"));
}
}