[FLINK-36188] Fix disable buffer flush lose efficacy (#49)
diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index 9a2736e..9f1109d 100644
--- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -665,6 +665,66 @@
sinkFunction.close();
}
+ @Test
+ void testTableSinkDisabledBufferFlush() throws Exception {
+ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+ tEnv.executeSql(
+ "CREATE TABLE hTableForSink ("
+ + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ + " family1 ROW<col1 INT>"
+ + ") WITH ("
+ + " 'connector' = 'hbase-2.2',"
+ + " 'sink.buffer-flush.max-size' = '0',"
+ + " 'sink.buffer-flush.max-rows' = '0',"
+ + " 'table-name' = '"
+ + TEST_TABLE_7
+ + "',"
+ + " 'zookeeper.quorum' = '"
+ + getZookeeperQuorum()
+ + "'"
+ + ")");
+
+ String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))";
+ tEnv.executeSql(insert).await();
+
+ tEnv.executeSql(
+ "CREATE VIEW user_click AS "
+ + " SELECT user_id, proctime() AS proc_time"
+ + " FROM ( "
+ + " VALUES(1), (1), (1), (1), (1)"
+ + " ) AS t (user_id);");
+
+ tEnv.executeSql(
+ "INSERT INTO hTableForSink SELECT "
+ + " user_id as rowkey,"
+ + " ROW(CAST(family1.col1 + 1 AS INT))"
+ + " FROM user_click INNER JOIN hTableForSink"
+ + " FOR SYSTEM_TIME AS OF user_click.proc_time"
+ + " ON hTableForSink.rowkey = user_click.user_id;");
+
+ tEnv.executeSql(
+ "CREATE TABLE hTableForQuery ("
+ + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ + " family1 ROW<col1 INT>"
+ + ") WITH ("
+ + " 'connector' = 'hbase-2.2',"
+ + " 'table-name' = '"
+ + TEST_TABLE_7
+ + "',"
+ + " 'zookeeper.quorum' = '"
+ + getZookeeperQuorum()
+ + "'"
+ + ")");
+ String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";
+
+ TableResult firstResult = tEnv.executeSql(query);
+ List<Row> firstResults = CollectionUtil.iteratorToList(firstResult.collect());
+ String firstExpected = "+I[1, 6]";
+ TestBaseUtils.compareResultAsText(firstResults, firstExpected);
+ }
+
private void verifyHBaseLookupJoin(Caching caching, boolean async) {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
index 6ea08bb..0195900 100644
--- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
+++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
@@ -46,6 +46,7 @@
protected static final String TEST_TABLE_4 = "testTable4";
protected static final String TEST_TABLE_5 = "testTable5";
protected static final String TEST_TABLE_6 = "testTable6";
+ protected static final String TEST_TABLE_7 = "testTable7";
protected static final String TEST_EMPTY_TABLE = "testEmptyTable";
protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
@@ -100,6 +101,7 @@
createHBaseTable4();
createHBaseTable5();
createHBaseTable6();
+ createHBaseTable7();
createEmptyHBaseTable();
}
@@ -262,6 +264,13 @@
createTable(tableName, families, SPLIT_KEYS);
}
+ private static void createHBaseTable7() {
+ // create a table
+ byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+ TableName tableName = TableName.valueOf(TEST_TABLE_7);
+ createTable(tableName, families, SPLIT_KEYS);
+ }
+
private static void createEmptyHBaseTable() {
// create a table
byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
index 0ffad05..fbe8dcd 100644
--- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
@@ -209,6 +209,8 @@
if (bufferFlushMaxMutations > 0
&& numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
flush();
+ } else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes == 0) {
+ flush();
}
}