blob: c9cabb37c7497ac4b01e324606996239d6a00856 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.hbase1;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.hbase.sink.HBaseSinkFunction;
import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase1.source.AbstractTableInputFormat;
import org.apache.flink.connector.hbase1.source.HBaseRowDataInputFormat;
import org.apache.flink.connector.hbase1.util.HBaseTestBase;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CollectionUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.flink.table.api.Expressions.$;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** IT cases for HBase connector (source and sink). */
class HBaseConnectorITCase extends HBaseTestBase {
// -------------------------------------------------------------------------------------
// HBaseTableSource tests
// -------------------------------------------------------------------------------------
@Test
void testTableSourceFullScan() {
TableEnvironment tEnv = TableEnvironment.create(batchSettings);
tEnv.executeSql(
"CREATE TABLE hTable ("
+ " family1 ROW<col1 INT>,"
+ " family2 ROW<col1 STRING, col2 BIGINT>,"
+ " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>,"
+ " rowkey INT,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'hbase-1.4',"
+ " 'table-name' = '"
+ TEST_TABLE_1
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
Table table =
tEnv.sqlQuery(
"SELECT "
+ " h.family1.col1, "
+ " h.family2.col1, "
+ " h.family2.col2, "
+ " h.family3.col1, "
+ " h.family3.col2, "
+ " h.family3.col3 "
+ "FROM hTable AS h");
List<Row> results = CollectionUtil.iteratorToList(table.execute().collect());
String expected =
"+I[10, Hello-1, 100, 1.01, false, Welt-1]\n"
+ "+I[20, Hello-2, 200, 2.02, true, Welt-2]\n"
+ "+I[30, Hello-3, 300, 3.03, false, Welt-3]\n"
+ "+I[40, null, 400, 4.04, true, Welt-4]\n"
+ "+I[50, Hello-5, 500, 5.05, false, Welt-5]\n"
+ "+I[60, Hello-6, 600, 6.06, true, Welt-6]\n"
+ "+I[70, Hello-7, 700, 7.07, false, Welt-7]\n"
+ "+I[80, null, 800, 8.08, true, Welt-8]\n";
TestBaseUtils.compareResultAsText(results, expected);
}
@Test
void testTableSourceEmptyTableScan() {
TableEnvironment tEnv = TableEnvironment.create(batchSettings);
tEnv.executeSql(
"CREATE TABLE hTable ("
+ " family1 ROW<col1 INT>,"
+ " rowkey INT,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'hbase-1.4',"
+ " 'table-name' = '"
+ TEST_EMPTY_TABLE
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
Table table = tEnv.sqlQuery("SELECT rowkey, h.family1.col1 FROM hTable AS h");
List<Row> results = CollectionUtil.iteratorToList(table.execute().collect());
assertThat(results).isEmpty();
}
@Test
void testTableSourceProjection() {
TableEnvironment tEnv = TableEnvironment.create(batchSettings);
tEnv.executeSql(
"CREATE TABLE hTable ("
+ " family1 ROW<col1 INT>,"
+ " family2 ROW<col1 STRING, col2 BIGINT>,"
+ " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>,"
+ " rowkey INT,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'hbase-1.4',"
+ " 'table-name' = '"
+ TEST_TABLE_1
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
Table table =
tEnv.sqlQuery(
"SELECT "
+ " h.family1.col1, "
+ " h.family3.col1, "
+ " h.family3.col2, "
+ " h.family3.col3 "
+ "FROM hTable AS h");
List<Row> results = CollectionUtil.iteratorToList(table.execute().collect());
String expected =
"+I[10, 1.01, false, Welt-1]\n"
+ "+I[20, 2.02, true, Welt-2]\n"
+ "+I[30, 3.03, false, Welt-3]\n"
+ "+I[40, 4.04, true, Welt-4]\n"
+ "+I[50, 5.05, false, Welt-5]\n"
+ "+I[60, 6.06, true, Welt-6]\n"
+ "+I[70, 7.07, false, Welt-7]\n"
+ "+I[80, 8.08, true, Welt-8]\n";
TestBaseUtils.compareResultAsText(results, expected);
}
@Test
void testTableSourceFieldOrder() {
TableEnvironment tEnv = TableEnvironment.create(batchSettings);
tEnv.executeSql(
"CREATE TABLE hTable ("
+ " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ " family2 ROW<col1 STRING, col2 BIGINT>,"
+ " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>,"
+ " family1 ROW<col1 INT>"
+ ") WITH ("
+ " 'connector' = 'hbase-1.4',"
+ " 'table-name' = '"
+ TEST_TABLE_1
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
Table table = tEnv.sqlQuery("SELECT * FROM hTable AS h");
List<Row> results = CollectionUtil.iteratorToList(table.execute().collect());
String expected =
"+I[1, +I[Hello-1, 100], +I[1.01, false, Welt-1], +I[10]]\n"
+ "+I[2, +I[Hello-2, 200], +I[2.02, true, Welt-2], +I[20]]\n"
+ "+I[3, +I[Hello-3, 300], +I[3.03, false, Welt-3], +I[30]]\n"
+ "+I[4, +I[null, 400], +I[4.04, true, Welt-4], +I[40]]\n"
+ "+I[5, +I[Hello-5, 500], +I[5.05, false, Welt-5], +I[50]]\n"
+ "+I[6, +I[Hello-6, 600], +I[6.06, true, Welt-6], +I[60]]\n"
+ "+I[7, +I[Hello-7, 700], +I[7.07, false, Welt-7], +I[70]]\n"
+ "+I[8, +I[null, 800], +I[8.08, true, Welt-8], +I[80]]\n";
TestBaseUtils.compareResultAsText(results, expected);
}
@Test
void testTableSourceReadAsByteArray() {
TableEnvironment tEnv = TableEnvironment.create(batchSettings);
tEnv.executeSql(
"CREATE TABLE hTable ("
+ " family2 ROW<col1 BYTES, col2 BYTES>,"
+ " rowkey INT"
+ // no primary key syntax
") WITH ("
+ " 'connector' = 'hbase-1.4',"
+ " 'table-name' = '"
+ TEST_TABLE_1
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
tEnv.registerFunction("toUTF8", new ToUTF8());
tEnv.registerFunction("toLong", new ToLong());
Table table =
tEnv.sqlQuery(
"SELECT "
+ " toUTF8(h.family2.col1), "
+ " toLong(h.family2.col2) "
+ "FROM hTable AS h");
List<Row> results = CollectionUtil.iteratorToList(table.execute().collect());
String expected =
"+I[Hello-1, 100]\n"
+ "+I[Hello-2, 200]\n"
+ "+I[Hello-3, 300]\n"
+ "+I[null, 400]\n"
+ "+I[Hello-5, 500]\n"
+ "+I[Hello-6, 600]\n"
+ "+I[Hello-7, 700]\n"
+ "+I[null, 800]\n";
TestBaseUtils.compareResultAsText(results, expected);
}
@Test
void testTableSink() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
// register HBase table testTable1 which contains test data
String table1DDL = createHBaseTableDDL(TEST_TABLE_1, false);
tEnv.executeSql(table1DDL);
String table2DDL = createHBaseTableDDL(TEST_TABLE_2, false);
tEnv.executeSql(table2DDL);
String query =
"INSERT INTO "
+ TEST_TABLE_2
+ " SELECT"
+ " rowkey,"
+ " family1,"
+ " family2,"
+ " family3"
+ " FROM "
+ TEST_TABLE_1;
tEnv.executeSql(query).await();
// start a batch scan job to verify contents in HBase table
TableEnvironment batchEnv = TableEnvironment.create(batchSettings);
batchEnv.executeSql(table2DDL);
Table table =
batchEnv.sqlQuery(
"SELECT "
+ " h.rowkey, "
+ " h.family1.col1, "
+ " h.family2.col1, "
+ " h.family2.col2, "
+ " h.family3.col1, "
+ " h.family3.col2, "
+ " h.family3.col3 "
+ "FROM "
+ TEST_TABLE_2
+ " AS h");
List<Row> results = CollectionUtil.iteratorToList(table.execute().collect());
String expected =
"+I[1, 10, Hello-1, 100, 1.01, false, Welt-1]\n"
+ "+I[2, 20, Hello-2, 200, 2.02, true, Welt-2]\n"
+ "+I[3, 30, Hello-3, 300, 3.03, false, Welt-3]\n"
+ "+I[4, 40, null, 400, 4.04, true, Welt-4]\n"
+ "+I[5, 50, Hello-5, 500, 5.05, false, Welt-5]\n"
+ "+I[6, 60, Hello-6, 600, 6.06, true, Welt-6]\n"
+ "+I[7, 70, Hello-7, 700, 7.07, false, Welt-7]\n"
+ "+I[8, 80, null, 800, 8.08, true, Welt-8]\n";
TestBaseUtils.compareResultAsText(results, expected);
}
@Test
void testTableSinkWithChangelog() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
// register values table for source
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, Row.of("Hello1")),
Row.ofKind(RowKind.DELETE, 1, Row.of("Hello2")),
Row.ofKind(RowKind.INSERT, 2, Row.of("Hello1")),
Row.ofKind(RowKind.INSERT, 2, Row.of("Hello2")),
Row.ofKind(RowKind.INSERT, 2, Row.of("Hello3")),
Row.ofKind(RowKind.DELETE, 2, Row.of("Hello3")),
Row.ofKind(RowKind.INSERT, 1, Row.of("Hello3"))));
tEnv.executeSql(
"CREATE TABLE source_table ("
+ " rowkey INT,"
+ " family1 ROW<name STRING>,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'data-id' = '"
+ dataId
+ "',"
+ " 'changelog-mode'='I,UA,UB,D'"
+ ")");
// register HBase table for sink
tEnv.executeSql(
"CREATE TABLE sink_table ("
+ " rowkey INT,"
+ " family1 ROW<name STRING>,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'hbase-1.4',"
+ " 'table-name' = '"
+ TEST_TABLE_4
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
tEnv.executeSql("INSERT INTO sink_table SELECT * FROM source_table").await();
TableResult result = tEnv.executeSql("SELECT * FROM sink_table");
List<Row> actual = CollectionUtil.iteratorToList(result.collect());
assertThat(actual).isEqualTo(Collections.singletonList(Row.of(1, Row.of("Hello3"))));
}
@Test
void testTableSinkWithTimestampMetadata() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
tEnv.executeSql(
"CREATE TABLE hTableForSink ("
+ " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ " family1 ROW<col1 INT>,"
+ " version TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'"
+ ") WITH ("
+ " 'connector' = 'hbase-1.4',"
+ " 'table-name' = '"
+ TEST_TABLE_5
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
String insert =
"INSERT INTO hTableForSink VALUES"
+ "(1, ROW(1), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+ "(2, ROW(2), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+ "(3, ROW(3), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+ "(1, ROW(10), TO_TIMESTAMP_LTZ(1696767943269, 3)),"
+ "(2, ROW(20), TO_TIMESTAMP_LTZ(1696767943271, 3))";
tEnv.executeSql(insert).await();
tEnv.executeSql(
"CREATE TABLE hTableForQuery ("
+ " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ " family1 ROW<col1 INT>"
+ ") WITH ("
+ " 'connector' = 'hbase-1.4',"
+ " 'table-name' = '"
+ TEST_TABLE_5
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
TableResult result = tEnv.executeSql("SELECT rowkey, family1.col1 FROM hTableForQuery");
List<Row> results = CollectionUtil.iteratorToList(result.collect());
String expected = "+I[1, 1]\n+I[2, 20]\n+I[3, 3]\n";
TestBaseUtils.compareResultAsText(results, expected);
}
@Test
void testTableSinkWithTTLMetadata() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
tEnv.executeSql(
"CREATE TABLE hTableForSink ("
+ " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ " family1 ROW<col1 INT>,"
+ " ttl BIGINT NOT NULL METADATA FROM 'ttl'"
+ ") WITH ("
+ " 'connector' = 'hbase-1.4',"
+ " 'table-name' = '"
+ TEST_TABLE_6
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
String insert =
"INSERT INTO hTableForSink VALUES"
+ "(1, ROW(1), 2000),"
+ "(2, ROW(2), 9000),"
+ "(3, ROW(3), 5000)";
tEnv.executeSql(insert).await();
tEnv.executeSql(
"CREATE TABLE hTableForQuery ("
+ " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ " family1 ROW<col1 INT>"
+ ") WITH ("
+ " 'connector' = 'hbase-1.4',"
+ " 'table-name' = '"
+ TEST_TABLE_6
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";
TableResult firstResult = tEnv.executeSql(query);
List<Row> firstResults = CollectionUtil.iteratorToList(firstResult.collect());
String firstExpected = "+I[1, 1]\n+I[2, 2]\n+I[3, 3]\n";
TestBaseUtils.compareResultAsText(firstResults, firstExpected);
TimeUnit.SECONDS.sleep(3);
TableResult secondResult = tEnv.executeSql(query);
List<Row> secondResults = CollectionUtil.iteratorToList(secondResult.collect());
String secondExpected = "+I[2, 2]\n+I[3, 3]\n";
TestBaseUtils.compareResultAsText(secondResults, secondExpected);
TimeUnit.SECONDS.sleep(3);
TableResult thirdResult = tEnv.executeSql(query);
List<Row> thirdResults = CollectionUtil.iteratorToList(thirdResult.collect());
String thirdExpected = "+I[2, 2]";
TestBaseUtils.compareResultAsText(thirdResults, thirdExpected);
TimeUnit.SECONDS.sleep(4);
TableResult lastResult = tEnv.executeSql(query);
List<Row> lastResults = CollectionUtil.iteratorToList(lastResult.collect());
assertThat(lastResults).isEmpty();
}
@Test
void testTableSourceSinkWithDDL() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
// register HBase table testTable1 which contains test data
String table1DDL = createHBaseTableDDL(TEST_TABLE_1, true);
tEnv.executeSql(table1DDL);
// register HBase table which is empty
String table3DDL = createHBaseTableDDL(TEST_TABLE_3, true);
tEnv.executeSql(table3DDL);
String insertStatement =
"INSERT INTO "
+ TEST_TABLE_3
+ " SELECT rowkey,"
+ " family1,"
+ " family2,"
+ " family3,"
+ " family4"
+ " from "
+ TEST_TABLE_1;
tEnv.executeSql(insertStatement).await();
// start a batch scan job to verify contents in HBase table
TableEnvironment batchEnv = TableEnvironment.create(batchSettings);
batchEnv.executeSql(table3DDL);
String query =
"SELECT "
+ " h.rowkey, "
+ " h.family1.col1, "
+ " h.family2.col1, "
+ " h.family2.col2, "
+ " h.family3.col1, "
+ " h.family3.col2, "
+ " h.family3.col3, "
+ " h.family4.col1, "
+ " h.family4.col2, "
+ " h.family4.col3, "
+ " h.family4.col4 "
+ " FROM "
+ TEST_TABLE_3
+ " AS h";
Iterator<Row> collected = tEnv.executeSql(query).collect();
List<String> result =
CollectionUtil.iteratorToList(collected).stream()
.map(Row::toString)
.sorted()
.collect(Collectors.toList());
List<String> expected = new ArrayList<>();
expected.add(
"+I[1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]");
expected.add(
"+I[2, 20, Hello-2, 200, 2.02, true, Welt-2, 2019-08-18T19:01, 2019-08-18, 19:01, 12345678.0002]");
expected.add(
"+I[3, 30, Hello-3, 300, 3.03, false, Welt-3, 2019-08-18T19:02, 2019-08-18, 19:02, 12345678.0003]");
expected.add(
"+I[4, 40, null, 400, 4.04, true, Welt-4, 2019-08-18T19:03, 2019-08-18, 19:03, 12345678.0004]");
expected.add(
"+I[5, 50, Hello-5, 500, 5.05, false, Welt-5, 2019-08-19T19:10, 2019-08-19, 19:10, 12345678.0005]");
expected.add(
"+I[6, 60, Hello-6, 600, 6.06, true, Welt-6, 2019-08-19T19:20, 2019-08-19, 19:20, 12345678.0006]");
expected.add(
"+I[7, 70, Hello-7, 700, 7.07, false, Welt-7, 2019-08-19T19:30, 2019-08-19, 19:30, 12345678.0007]");
expected.add(
"+I[8, 80, null, 800, 8.08, true, Welt-8, 2019-08-19T19:40, 2019-08-19, 19:40, 12345678.0008]");
assertThat(result).isEqualTo(expected);
}
@Test
void testHBaseLookupTableSource() {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
tEnv.executeSql(
"CREATE TABLE "
+ TEST_TABLE_1
+ " ("
+ " family1 ROW<col1 INT>,"
+ " family2 ROW<col1 STRING, col2 BIGINT>,"
+ " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>,"
+ " rowkey INT,"
+ " family4 ROW<col1 TIMESTAMP(3), col2 DATE, col3 TIME(3), col4 DECIMAL(12, 4)>,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'hbase-1.4',"
+ " 'table-name' = '"
+ TEST_TABLE_1
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
// prepare a source table
String srcTableName = "src";
DataStream<Row> srcDs = execEnv.fromCollection(testData).returns(testTypeInfo);
Table in = tEnv.fromDataStream(srcDs, $("a"), $("b"), $("c"), $("proc").proctime());
tEnv.registerTable(srcTableName, in);
// perform a temporal table join query
String dimJoinQuery =
"SELECT"
+ " a,"
+ " b,"
+ " h.family1.col1,"
+ " h.family2.col1,"
+ " h.family2.col2,"
+ " h.family3.col1,"
+ " h.family3.col2,"
+ " h.family3.col3,"
+ " h.family4.col1,"
+ " h.family4.col2,"
+ " h.family4.col3,"
+ " h.family4.col4 "
+ " FROM src JOIN "
+ TEST_TABLE_1
+ " FOR SYSTEM_TIME AS OF src.proc as h ON src.a = h.rowkey";
Iterator<Row> collected = tEnv.executeSql(dimJoinQuery).collect();
List<String> result =
CollectionUtil.iteratorToList(collected).stream()
.map(Row::toString)
.sorted()
.collect(Collectors.toList());
List<String> expected = new ArrayList<>();
expected.add(
"+I[1, 1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]");
expected.add(
"+I[2, 2, 20, Hello-2, 200, 2.02, true, Welt-2, 2019-08-18T19:01, 2019-08-18, 19:01, 12345678.0002]");
expected.add(
"+I[3, 2, 30, Hello-3, 300, 3.03, false, Welt-3, 2019-08-18T19:02, 2019-08-18, 19:02, 12345678.0003]");
expected.add(
"+I[3, 3, 30, Hello-3, 300, 3.03, false, Welt-3, 2019-08-18T19:02, 2019-08-18, 19:02, 12345678.0003]");
assertThat(result).isEqualTo(expected);
}
@Test
void testTableInputFormatOpenClose() throws IOException {
HBaseTableSchema tableSchema = new HBaseTableSchema();
tableSchema.addColumn(FAMILY1, F1COL1, byte[].class);
AbstractTableInputFormat<?> inputFormat =
new HBaseRowDataInputFormat(getConf(), TEST_TABLE_1, tableSchema, "null");
inputFormat.open(inputFormat.createInputSplits(1)[0]);
assertThat(inputFormat.getConnection()).isNotNull();
assertThat(inputFormat.getConnection().getTable(TableName.valueOf(TEST_TABLE_1)))
.isNotNull();
inputFormat.close();
assertThat(inputFormat.getConnection()).isNull();
}
@Test
void testTableInputFormatTableExistence() throws IOException {
HBaseTableSchema tableSchema = new HBaseTableSchema();
tableSchema.addColumn(FAMILY1, F1COL1, byte[].class);
AbstractTableInputFormat<?> inputFormat =
new HBaseRowDataInputFormat(getConf(), TEST_NOT_EXISTS_TABLE, tableSchema, "null");
assertThatThrownBy(() -> inputFormat.createInputSplits(1))
.isExactlyInstanceOf(TableNotFoundException.class);
inputFormat.close();
assertThat(inputFormat.getConnection()).isNull();
}
@Test
void testHBaseSinkFunctionTableExistence() throws Exception {
org.apache.hadoop.conf.Configuration hbaseConf =
HBaseConfigurationUtil.getHBaseConfiguration();
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getZookeeperQuorum());
hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase");
HBaseTableSchema tableSchema = new HBaseTableSchema();
tableSchema.addColumn(FAMILY1, F1COL1, byte[].class);
HBaseSinkFunction<RowData> sinkFunction =
new HBaseSinkFunction<>(
TEST_NOT_EXISTS_TABLE,
hbaseConf,
new RowDataToMutationConverter(
tableSchema,
tableSchema.convertToDataType(),
Collections.emptyList(),
"null",
false),
2 * 1024 * 1024,
1000,
1000);
assertThatThrownBy(() -> sinkFunction.open(new Configuration()))
.getRootCause()
.isExactlyInstanceOf(TableNotFoundException.class);
sinkFunction.close();
}
// -------------------------------------------------------------------------------------
// HBase lookup source tests
// -------------------------------------------------------------------------------------
// prepare a source collection.
private static final List<Row> testData = new ArrayList<>();
private static final RowTypeInfo testTypeInfo =
new RowTypeInfo(
new TypeInformation[] {Types.INT, Types.LONG, Types.STRING},
new String[] {"a", "b", "c"});
static {
testData.add(Row.of(1, 1L, "Hi"));
testData.add(Row.of(2, 2L, "Hello"));
testData.add(Row.of(3, 2L, "Hello world"));
testData.add(Row.of(3, 3L, "Hello world!"));
}
// ------------------------------- Utilities -------------------------------------------------
/** A {@link ScalarFunction} that maps byte arrays to UTF-8 strings. */
public static class ToUTF8 extends ScalarFunction {
private static final long serialVersionUID = 1L;
public String eval(byte[] bytes) {
return Bytes.toString(bytes);
}
}
/** A {@link ScalarFunction} that maps byte array to longs. */
public static class ToLong extends ScalarFunction {
private static final long serialVersionUID = 1L;
public long eval(byte[] bytes) {
return Bytes.toLong(bytes);
}
}
private String createHBaseTableDDL(String tableName, boolean testTimeAndDecimalTypes) {
StringBuilder family4Statement = new StringBuilder();
if (testTimeAndDecimalTypes) {
family4Statement.append(", family4 ROW<col1 TIMESTAMP(3)");
family4Statement.append(", col2 DATE");
family4Statement.append(", col3 TIME(3)");
family4Statement.append(", col4 DECIMAL(12, 4)");
family4Statement.append("> \n");
}
return "CREATE TABLE "
+ tableName
+ "(\n"
+ " rowkey INT,"
+ " family1 ROW<col1 INT>,\n"
+ " family2 ROW<col1 VARCHAR, col2 BIGINT>,\n"
+ " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>"
+ family4Statement.toString()
+ ") WITH (\n"
+ " 'connector' = 'hbase-1.4',\n"
+ " 'table-name' = '"
+ tableName
+ "',\n"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "',\n"
+ " 'zookeeper.znode.parent' = '/hbase' "
+ ")";
}
}