[FLINK-30360][Connectors/HBase] When specified the partial columns, ignore all empty columns
diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
index 1ad2dd2..38046bd 100644
--- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
+++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
@@ -281,6 +281,61 @@
     }
 
     @Test
+    public void testPartialColumnTableSink() throws Exception {
+        StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+        // register HBase table testTable1 which contains test data
+        String table1DDL = createHBaseTableDDL(TEST_TABLE_1, false);
+        tEnv.executeSql(table1DDL);
+
+        String table2DDL = createHBaseTableDDL(TEST_TABLE_4, false);
+        tEnv.executeSql(table2DDL);
+
+        String query =
+                "INSERT INTO "
+                        + TEST_TABLE_4
+                        + " (rowkey, family3) "
+                        + " SELECT"
+                        + " rowkey,"
+                        + " family3"
+                        + " FROM "
+                        + TEST_TABLE_1;
+
+        tEnv.executeSql(query).await();
+
+        // start a batch scan job to verify contents in HBase table
+        TableEnvironment batchEnv = TableEnvironment.create(batchSettings);
+        batchEnv.executeSql(table2DDL);
+
+        Table table =
+                batchEnv.sqlQuery(
+                        "SELECT "
+                                + "  h.rowkey, "
+                                + "  h.family1.col1, "
+                                + "  h.family2.col1, "
+                                + "  h.family2.col2, "
+                                + "  h.family3.col1, "
+                                + "  h.family3.col2, "
+                                + "  h.family3.col3 "
+                                + "FROM "
+                                + TEST_TABLE_4
+                                + " AS h");
+        List<Row> results = CollectionUtil.iteratorToList(table.execute().collect());
+        String expected =
+                "+I[1, null, null, null, 1.01, false, Welt-1]\n"
+                        + "+I[2, null, null, null, 2.02, true, Welt-2]\n"
+                        + "+I[3, null, null, null, 3.03, false, Welt-3]\n"
+                        + "+I[4, null, null, null, 4.04, true, Welt-4]\n"
+                        + "+I[5, null, null, null, 5.05, false, Welt-5]\n"
+                        + "+I[6, null, null, null, 6.06, true, Welt-6]\n"
+                        + "+I[7, null, null, null, 7.07, false, Welt-7]\n"
+                        + "+I[8, null, null, null, 8.08, true, Welt-8]\n";
+
+        TestBaseUtils.compareResultAsText(results, expected);
+    }
+
+    @Test
     public void testTableSourceSinkWithDDL() throws Exception {
         StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
index cc2de79..7a589d4 100644
--- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
+++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
@@ -43,6 +43,7 @@
     protected static final String TEST_TABLE_1 = "testTable1";
     protected static final String TEST_TABLE_2 = "testTable2";
     protected static final String TEST_TABLE_3 = "testTable3";
+    protected static final String TEST_TABLE_4 = "testTable4";
 
     protected static final String ROW_KEY = "rowkey";
 
@@ -92,6 +93,7 @@
         createHBaseTable1();
         createHBaseTable2();
         createHBaseTable3();
+        createHBaseTable4();
     }
 
     private static void createHBaseTable1() throws IOException {
@@ -232,6 +234,12 @@
         createTable(tableName, families, SPLIT_KEYS);
     }
 
+    private static void createHBaseTable4() {
+        // create a table
+        TableName tableName = TableName.valueOf(TEST_TABLE_4);
+        createTable(tableName, FAMILIES, SPLIT_KEYS);
+    }
+
     private static Put putRow(
             int rowKey,
             int f1c1,
diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index e75d0d0..572b876 100644
--- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -312,6 +312,61 @@
     }
 
     @Test
+    public void testPartialColumnTableSink() throws Exception {
+        StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+        // register HBase table testTable1 which contains test data
+        String table1DDL = createHBaseTableDDL(TEST_TABLE_1, false);
+        tEnv.executeSql(table1DDL);
+
+        String table2DDL = createHBaseTableDDL(TEST_TABLE_4, false);
+        tEnv.executeSql(table2DDL);
+
+        String query =
+                "INSERT INTO "
+                        + TEST_TABLE_4
+                        + " (rowkey, family3) "
+                        + " SELECT"
+                        + " rowkey,"
+                        + " family3"
+                        + " FROM "
+                        + TEST_TABLE_1;
+
+        tEnv.executeSql(query).await();
+
+        // start a batch scan job to verify contents in HBase table
+        TableEnvironment batchEnv = TableEnvironment.create(batchSettings);
+        batchEnv.executeSql(table2DDL);
+
+        Table table =
+                batchEnv.sqlQuery(
+                        "SELECT "
+                                + "  h.rowkey, "
+                                + "  h.family1.col1, "
+                                + "  h.family2.col1, "
+                                + "  h.family2.col2, "
+                                + "  h.family3.col1, "
+                                + "  h.family3.col2, "
+                                + "  h.family3.col3 "
+                                + "FROM "
+                                + TEST_TABLE_4
+                                + " AS h");
+        List<Row> results = CollectionUtil.iteratorToList(table.execute().collect());
+        String expected =
+                "+I[1, null, null, null, 1.01, false, Welt-1]\n"
+                        + "+I[2, null, null, null, 2.02, true, Welt-2]\n"
+                        + "+I[3, null, null, null, 3.03, false, Welt-3]\n"
+                        + "+I[4, null, null, null, 4.04, true, Welt-4]\n"
+                        + "+I[5, null, null, null, 5.05, false, Welt-5]\n"
+                        + "+I[6, null, null, null, 6.06, true, Welt-6]\n"
+                        + "+I[7, null, null, null, 7.07, false, Welt-7]\n"
+                        + "+I[8, null, null, null, 8.08, true, Welt-8]\n";
+
+        TestBaseUtils.compareResultAsText(results, expected);
+    }
+
+    @Test
     public void testTableSourceSinkWithDDL() throws Exception {
         StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
index 1e639ba..331aabf 100644
--- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
+++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
@@ -43,6 +43,7 @@
     protected static final String TEST_TABLE_1 = "testTable1";
     protected static final String TEST_TABLE_2 = "testTable2";
     protected static final String TEST_TABLE_3 = "testTable3";
+    protected static final String TEST_TABLE_4 = "testTable4";
 
     protected static final String ROW_KEY = "rowkey";
 
@@ -92,6 +93,7 @@
         createHBaseTable1();
         createHBaseTable2();
         createHBaseTable3();
+        createHBaseTable4();
     }
 
     private static void createHBaseTable1() throws IOException {
@@ -232,6 +234,12 @@
         createTable(tableName, families, SPLIT_KEYS);
     }
 
+    private static void createHBaseTable4() {
+        // create a table
+        TableName tableName = TableName.valueOf(TEST_TABLE_4);
+        createTable(tableName, FAMILIES, SPLIT_KEYS);
+    }
+
     private static Put putRow(
             int rowKey,
             int f1c1,
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index d21cc4a..a8863ff 100644
--- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -140,6 +140,9 @@
                 // get family key
                 byte[] familyKey = families[f];
                 RowData familyRow = row.getRow(i, qualifiers[f].length);
+                if (familyRow == null) {
+                    continue;
+                }
                 for (int q = 0; q < this.qualifiers[f].length; q++) {
                     // get quantifier key
                     byte[] qualifier = qualifiers[f][q];