blob: f6941082a1d9685b3a42360328e5a24806c2e1ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.kudu.table;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.KuduTestBase;
import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
import org.apache.flink.connectors.kudu.connector.writer.TupleOperationMapper;
import org.apache.flink.connectors.kudu.streaming.KuduSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.types.Row;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.shaded.com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.*;
public class KuduCatalogTest extends KuduTestBase {
private KuduCatalog catalog;
private StreamTableEnvironment tableEnv;
@BeforeEach
public void init() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
catalog = new KuduCatalog(harness.getMasterAddressesAsString());
tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerBatchMode(env);
tableEnv.registerCatalog("kudu", catalog);
tableEnv.useCatalog("kudu");
}
@Test
public void testCreateAlterDrop() throws Exception {
tableEnv.sqlUpdate("CREATE TABLE TestTable1 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
tableEnv.sqlUpdate("INSERT INTO TestTable1 VALUES ('f', 's')");
// Add this once Primary key support has been enabled
// tableEnv.sqlUpdate("CREATE TABLE TestTable2 (`first` STRING, `second` String, PRIMARY KEY(`first`)) WITH ('kudu.hash-columns' = 'first')");
// tableEnv.sqlUpdate("INSERT INTO TestTable2 VALUES ('f', 's')");
tableEnv.execute("test");
validateSingleKey("TestTable1");
// validateSingleKey("TestTable2");
tableEnv.sqlUpdate("ALTER TABLE TestTable1 RENAME TO TestTable1R");
validateSingleKey("TestTable1R");
tableEnv.sqlUpdate("DROP TABLE TestTable1R");
assertFalse(harness.getClient().tableExists("TestTable1R"));
}
@Test
public void testCreateAndInsertMultiKey() throws Exception {
tableEnv.sqlUpdate("CREATE TABLE TestTable3 (`first` STRING, `second` INT, third STRING) WITH ('kudu.hash-columns' = 'first,second', 'kudu.primary-key-columns' = 'first,second')");
tableEnv.sqlUpdate("INSERT INTO TestTable3 VALUES ('f', 2, 't')");
// Add this once Primary key support has been enabled
// tableEnv.sqlUpdate("CREATE TABLE TestTable4 (`first` STRING, `second` INT, `third` STRING) PRIMARY KEY (`first`, `second`) WITH ('kudu.hash-columns' = 'first,second')");
// tableEnv.sqlUpdate("INSERT INTO TestTable4 VALUES ('f', 2, 't')");
tableEnv.execute("test");
validateMultiKey("TestTable3");
// validateMultiKey("TestTable4");
}
@Test
public void testSourceProjection() throws Exception {
tableEnv.sqlUpdate("CREATE TABLE TestTable5 (`second` String, `first` STRING, `third` String) WITH ('kudu.hash-columns' = 'second', 'kudu.primary-key-columns' = 'second')");
tableEnv.sqlUpdate("INSERT INTO TestTable5 VALUES ('s', 'f', 't')");
tableEnv.execute("test");
tableEnv.sqlUpdate("CREATE TABLE TestTable6 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
tableEnv.sqlUpdate("INSERT INTO TestTable6 (SELECT `first`, `second` FROM TestTable5)");
tableEnv.execute("test");
validateSingleKey("TestTable6");
}
@Test
public void testEmptyProjection() throws Exception {
tableEnv.sqlUpdate("CREATE TABLE TestTableEP (`first` STRING, `second` STRING) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
tableEnv.sqlUpdate("INSERT INTO TestTableEP VALUES ('f','s')");
tableEnv.sqlUpdate("INSERT INTO TestTableEP VALUES ('f2','s2')");
tableEnv.execute("test");
Table result = tableEnv.sqlQuery("SELECT COUNT(*) FROM TestTableEP");
DataStream<Tuple2<Boolean, Row>> resultDataStream = tableEnv.toRetractStream(result, Types.ROW(Types.LONG));
CollectionSink.output.clear();
resultDataStream
.map(t -> Tuple2.of(t.f0, t.f1.getField(0)))
.returns(Types.TUPLE(Types.BOOLEAN, Types.LONG))
.addSink(new CollectionSink<>()).setParallelism(1);
tableEnv.execute("test");
List<Tuple2<Boolean, Long>> expected = Lists.newArrayList(Tuple2.of(true, 1L), Tuple2.of(false, 1L), Tuple2.of(true, 2L));
assertEquals(new HashSet<>(expected), new HashSet<>(CollectionSink.output));
CollectionSink.output.clear();
}
@Test
public void dataStreamEndToEstTest() throws Exception {
KuduCatalog catalog = new KuduCatalog(harness.getMasterAddressesAsString());
// Creating table through catalog
KuduTableFactory tableFactory = catalog.getKuduTableFactory();
KuduTableInfo tableInfo = KuduTableInfo.forTable("TestTable7").createTableIfNotExists(
() ->
Lists.newArrayList(
new ColumnSchema
.ColumnSchemaBuilder("k", Type.INT32)
.key(true)
.build(),
new ColumnSchema
.ColumnSchemaBuilder("v", Type.STRING)
.build()
),
() -> new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(Lists.newArrayList("k"), 2));
catalog.createTable(tableInfo, false);
ObjectPath path = catalog.getObjectPath("TestTable7");
CatalogTable table = catalog.getTable(path);
List<Tuple2<Integer, String>> input = Lists.newArrayList(Tuple2.of(1, "one"), Tuple2.of(2, "two"), Tuple2.of(3, "three"));
// Writing with simple sink
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(harness.getMasterAddressesAsString()).build();
env.fromCollection(input).addSink(
new KuduSink<>(
writerConfig,
tableInfo,
new TupleOperationMapper<>(
new String[]{"k", "v"},
AbstractSingleOperationMapper.KuduOperation.INSERT)
)
);
env.execute();
// Reading and validating data
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
CollectionSink.output.clear();
tableFactory.createTableSource(path, table)
.getDataStream(env)
.map(row -> Tuple2.of((int) row.getField(0), (String) row.getField(1)))
.returns(new TypeHint<Tuple2<Integer, String>>() {
})
.addSink(new CollectionSink<>()).setParallelism(1);
env.execute();
List<Tuple2<Integer, String>> expected = Lists.newArrayList(Tuple2.of(1, "one"), Tuple2.of(2, "two"), Tuple2.of(3, "three"));
assertEquals(new HashSet<>(expected), new HashSet<>(CollectionSink.output));
CollectionSink.output.clear();
}
@Test
public void testTimestamp() throws Exception {
tableEnv.sqlUpdate("CREATE TABLE TestTableTsC (`first` STRING, `second` TIMESTAMP(3)) " +
"WITH ('kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')");
tableEnv.sqlUpdate("INSERT INTO TestTableTsC values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')");
tableEnv.execute("test");
KuduTable kuduTable = harness.getClient().openTable("TestTableTsC");
assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType());
KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
List<RowResult> rows = new ArrayList<>();
scanner.forEach(rows::add);
assertEquals(1, rows.size());
assertEquals("f", rows.get(0).getString(0));
assertEquals(Timestamp.valueOf("2020-01-01 12:12:12.123"), rows.get(0).getTimestamp(1));
}
@Test
public void testDatatypes() throws Exception {
tableEnv.sqlUpdate("CREATE TABLE TestTable8 (`first` STRING, `second` BOOLEAN, `third` BYTES," +
"`fourth` TINYINT, `fifth` SMALLINT, `sixth` INT, `seventh` BIGINT, `eighth` FLOAT, `ninth` DOUBLE, " +
"`tenth` TIMESTAMP)" +
"WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')");
tableEnv.sqlUpdate("INSERT INTO TestTable8 values ('f', false, cast('bbbb' as BYTES), cast(12 as TINYINT)," +
"cast(34 as SMALLINT), 56, cast(78 as BIGINT), cast(3.14 as FLOAT), cast(1.2345 as DOUBLE)," +
"TIMESTAMP '2020-04-15 12:34:56.123') ");
tableEnv.execute("test");
validateManyTypes("TestTable8");
}
@Test
public void testMissingPropertiesCatalog() throws Exception {
assertThrows(TableException.class,
() -> tableEnv.sqlUpdate("CREATE TABLE TestTable9a (`first` STRING, `second` String) " +
"WITH ('kudu.primary-key-columns' = 'second')"));
assertThrows(TableException.class,
() -> tableEnv.sqlUpdate("CREATE TABLE TestTable9b (`first` STRING, `second` String) " +
"WITH ('kudu.hash-columns' = 'first')"));
assertThrows(TableException.class,
() -> tableEnv.sqlUpdate("CREATE TABLE TestTable9b (`first` STRING, `second` String) " +
"WITH ('kudu.primary-key-columns' = 'second', 'kudu.hash-columns' = 'first')"));
}
private void validateManyTypes(String tableName) throws Exception {
KuduTable kuduTable = harness.getClient().openTable(tableName);
Schema schema = kuduTable.getSchema();
assertEquals(Type.STRING, schema.getColumn("first").getType());
assertEquals(Type.BOOL, schema.getColumn("second").getType());
assertEquals(Type.BINARY, schema.getColumn("third").getType());
assertEquals(Type.INT8, schema.getColumn("fourth").getType());
assertEquals(Type.INT16, schema.getColumn("fifth").getType());
assertEquals(Type.INT32, schema.getColumn("sixth").getType());
assertEquals(Type.INT64, schema.getColumn("seventh").getType());
assertEquals(Type.FLOAT, schema.getColumn("eighth").getType());
assertEquals(Type.DOUBLE, schema.getColumn("ninth").getType());
assertEquals(Type.UNIXTIME_MICROS, schema.getColumn("tenth").getType());
KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
List<RowResult> rows = new ArrayList<>();
scanner.forEach(rows::add);
assertEquals(1, rows.size());
assertEquals("f", rows.get(0).getString(0));
assertEquals(false, rows.get(0).getBoolean(1));
assertEquals(ByteBuffer.wrap("bbbb".getBytes()), rows.get(0).getBinary(2));
assertEquals(12, rows.get(0).getByte(3));
assertEquals(34, rows.get(0).getShort(4));
assertEquals(56, rows.get(0).getInt(5));
assertEquals(78, rows.get(0).getLong(6));
assertEquals(3.14, rows.get(0).getFloat(7), 0.01);
assertEquals(1.2345, rows.get(0).getDouble(8), 0.0001);
assertEquals(Timestamp.valueOf("2020-04-15 12:34:56.123"), rows.get(0).getTimestamp(9));
}
private void validateMultiKey(String tableName) throws Exception {
KuduTable kuduTable = harness.getClient().openTable(tableName);
Schema schema = kuduTable.getSchema();
assertEquals(2, schema.getPrimaryKeyColumnCount());
assertEquals(3, schema.getColumnCount());
assertTrue(schema.getColumn("first").isKey());
assertTrue(schema.getColumn("second").isKey());
assertFalse(schema.getColumn("third").isKey());
KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
List<RowResult> rows = new ArrayList<>();
scanner.forEach(rows::add);
assertEquals(1, rows.size());
assertEquals("f", rows.get(0).getString("first"));
assertEquals(2, rows.get(0).getInt("second"));
assertEquals("t", rows.get(0).getString("third"));
}
public static class CollectionSink<T> implements SinkFunction<T> {
public static List<Object> output = Collections.synchronizedList(new ArrayList<>());
public void invoke(T value, SinkFunction.Context context) throws Exception {
output.add(value);
}
}
}