[FLINK-30360][Connectors/HBase] When specified the partial columns, ignore all empty columns
diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
index 1ad2dd2..38046bd 100644
--- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
+++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
@@ -281,6 +281,61 @@
}
@Test
+ public void testPartialColumnTableSink() throws Exception {
+ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+ // register HBase table testTable1 which contains test data
+ String table1DDL = createHBaseTableDDL(TEST_TABLE_1, false);
+ tEnv.executeSql(table1DDL);
+
+ String table2DDL = createHBaseTableDDL(TEST_TABLE_4, false);
+ tEnv.executeSql(table2DDL);
+
+ String query =
+ "INSERT INTO "
+ + TEST_TABLE_4
+ + " (rowkey, family3) "
+ + " SELECT"
+ + " rowkey,"
+ + " family3"
+ + " FROM "
+ + TEST_TABLE_1;
+
+ tEnv.executeSql(query).await();
+
+ // start a batch scan job to verify contents in HBase table
+ TableEnvironment batchEnv = TableEnvironment.create(batchSettings);
+ batchEnv.executeSql(table2DDL);
+
+ Table table =
+ batchEnv.sqlQuery(
+ "SELECT "
+ + " h.rowkey, "
+ + " h.family1.col1, "
+ + " h.family2.col1, "
+ + " h.family2.col2, "
+ + " h.family3.col1, "
+ + " h.family3.col2, "
+ + " h.family3.col3 "
+ + "FROM "
+ + TEST_TABLE_4
+ + " AS h");
+ List<Row> results = CollectionUtil.iteratorToList(table.execute().collect());
+ String expected =
+ "+I[1, null, null, null, 1.01, false, Welt-1]\n"
+ + "+I[2, null, null, null, 2.02, true, Welt-2]\n"
+ + "+I[3, null, null, null, 3.03, false, Welt-3]\n"
+ + "+I[4, null, null, null, 4.04, true, Welt-4]\n"
+ + "+I[5, null, null, null, 5.05, false, Welt-5]\n"
+ + "+I[6, null, null, null, 6.06, true, Welt-6]\n"
+ + "+I[7, null, null, null, 7.07, false, Welt-7]\n"
+ + "+I[8, null, null, null, 8.08, true, Welt-8]\n";
+
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
+ @Test
public void testTableSourceSinkWithDDL() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
index cc2de79..7a589d4 100644
--- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
+++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
@@ -43,6 +43,7 @@
protected static final String TEST_TABLE_1 = "testTable1";
protected static final String TEST_TABLE_2 = "testTable2";
protected static final String TEST_TABLE_3 = "testTable3";
+ protected static final String TEST_TABLE_4 = "testTable4";
protected static final String ROW_KEY = "rowkey";
@@ -92,6 +93,7 @@
createHBaseTable1();
createHBaseTable2();
createHBaseTable3();
+ createHBaseTable4();
}
private static void createHBaseTable1() throws IOException {
@@ -232,6 +234,12 @@
createTable(tableName, families, SPLIT_KEYS);
}
+ private static void createHBaseTable4() {
+ // create a table
+ TableName tableName = TableName.valueOf(TEST_TABLE_4);
+ createTable(tableName, FAMILIES, SPLIT_KEYS);
+ }
+
private static Put putRow(
int rowKey,
int f1c1,
diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index e75d0d0..572b876 100644
--- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -312,6 +312,61 @@
}
@Test
+ public void testPartialColumnTableSink() throws Exception {
+ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+ // register HBase table testTable1 which contains test data
+ String table1DDL = createHBaseTableDDL(TEST_TABLE_1, false);
+ tEnv.executeSql(table1DDL);
+
+ String table2DDL = createHBaseTableDDL(TEST_TABLE_4, false);
+ tEnv.executeSql(table2DDL);
+
+ String query =
+ "INSERT INTO "
+ + TEST_TABLE_4
+ + " (rowkey, family3) "
+ + " SELECT"
+ + " rowkey,"
+ + " family3"
+ + " FROM "
+ + TEST_TABLE_1;
+
+ tEnv.executeSql(query).await();
+
+ // start a batch scan job to verify contents in HBase table
+ TableEnvironment batchEnv = TableEnvironment.create(batchSettings);
+ batchEnv.executeSql(table2DDL);
+
+ Table table =
+ batchEnv.sqlQuery(
+ "SELECT "
+ + " h.rowkey, "
+ + " h.family1.col1, "
+ + " h.family2.col1, "
+ + " h.family2.col2, "
+ + " h.family3.col1, "
+ + " h.family3.col2, "
+ + " h.family3.col3 "
+ + "FROM "
+ + TEST_TABLE_4
+ + " AS h");
+ List<Row> results = CollectionUtil.iteratorToList(table.execute().collect());
+ String expected =
+ "+I[1, null, null, null, 1.01, false, Welt-1]\n"
+ + "+I[2, null, null, null, 2.02, true, Welt-2]\n"
+ + "+I[3, null, null, null, 3.03, false, Welt-3]\n"
+ + "+I[4, null, null, null, 4.04, true, Welt-4]\n"
+ + "+I[5, null, null, null, 5.05, false, Welt-5]\n"
+ + "+I[6, null, null, null, 6.06, true, Welt-6]\n"
+ + "+I[7, null, null, null, 7.07, false, Welt-7]\n"
+ + "+I[8, null, null, null, 8.08, true, Welt-8]\n";
+
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
+ @Test
public void testTableSourceSinkWithDDL() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
index 1e639ba..331aabf 100644
--- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
+++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
@@ -43,6 +43,7 @@
protected static final String TEST_TABLE_1 = "testTable1";
protected static final String TEST_TABLE_2 = "testTable2";
protected static final String TEST_TABLE_3 = "testTable3";
+ protected static final String TEST_TABLE_4 = "testTable4";
protected static final String ROW_KEY = "rowkey";
@@ -92,6 +93,7 @@
createHBaseTable1();
createHBaseTable2();
createHBaseTable3();
+ createHBaseTable4();
}
private static void createHBaseTable1() throws IOException {
@@ -232,6 +234,12 @@
createTable(tableName, families, SPLIT_KEYS);
}
+ private static void createHBaseTable4() {
+ // create a table
+ TableName tableName = TableName.valueOf(TEST_TABLE_4);
+ createTable(tableName, FAMILIES, SPLIT_KEYS);
+ }
+
private static Put putRow(
int rowKey,
int f1c1,
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index d21cc4a..a8863ff 100644
--- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -140,6 +140,9 @@
// get family key
byte[] familyKey = families[f];
RowData familyRow = row.getRow(i, qualifiers[f].length);
+ if (familyRow == null) {
+ continue;
+ }
for (int q = 0; q < this.qualifiers[f].length; q++) {
// get quantifier key
byte[] qualifier = qualifiers[f][q];