[FLINK-35233] Fix lookup cache reuse RowData object problem (#47)
* fix: convertToReusedRow() is now returned by default, and the result returned is a reused object. If lookup.cache is enabled, the result encapsulated in the reused object will be cached externally, resulting in all cached values being the same object
* [FLINK-35233] Fix lookup cache reuse RowData object problem
---------
Co-authored-by: xiekunyuan <xiekunyuan@meizu.com>
diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index a946edd..9a2736e 100644
--- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -48,6 +48,8 @@
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException;
import java.util.ArrayList;
@@ -591,14 +593,16 @@
assertThat(result).isEqualTo(expected);
}
- @Test
- void testHBaseLookupTableSource() {
- verifyHBaseLookupJoin(false);
+ @ParameterizedTest
+ @EnumSource(Caching.class)
+ void testHBaseLookupTableSource(Caching caching) {
+ verifyHBaseLookupJoin(caching, false);
}
- @Test
- void testHBaseAsyncLookupTableSource() {
- verifyHBaseLookupJoin(true);
+ @ParameterizedTest
+ @EnumSource(Caching.class)
+ void testHBaseAsyncLookupTableSource(Caching caching) {
+ verifyHBaseLookupJoin(caching, true);
}
@Test
@@ -661,10 +665,22 @@
sinkFunction.close();
}
- private void verifyHBaseLookupJoin(boolean async) {
+ private void verifyHBaseLookupJoin(Caching caching, boolean async) {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+ String cacheOptions = "";
+ if (caching == Caching.ENABLE_CACHE) {
+ cacheOptions =
+ ","
+ + String.join(
+ ",",
+ Arrays.asList(
+ "'lookup.cache' = 'PARTIAL'",
+ "'lookup.partial-cache.max-rows' = '1000'",
+ "'lookup.partial-cache.expire-after-write' = '10min'"));
+ }
+
tEnv.executeSql(
"CREATE TABLE "
+ TEST_TABLE_1
@@ -686,6 +702,7 @@
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ + cacheOptions
+ ")");
// prepare a source table
@@ -725,6 +742,8 @@
expected.add(
"+I[1, 1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]");
expected.add(
+ "+I[1, 1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]");
+ expected.add(
"+I[2, 2, 20, Hello-2, 200, 2.02, true, Welt-2, 2019-08-18T19:01, 2019-08-18, 19:01, 12345678.0002]");
expected.add(
"+I[3, 2, 30, Hello-3, 300, 3.03, false, Welt-3, 2019-08-18T19:02, 2019-08-18, 19:02, 12345678.0003]");
@@ -750,6 +769,12 @@
testData.add(Row.of(2, 2L, "Hello"));
testData.add(Row.of(3, 2L, "Hello world"));
testData.add(Row.of(3, 3L, "Hello world!"));
+ testData.add(Row.of(1, 1L, "Hi")); // lookup one more time
+ }
+
+ private enum Caching {
+ ENABLE_CACHE,
+ DISABLE_CACHE
}
// ------------------------------- Utilities -------------------------------------------------
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
index 0e1ba54..59edf30 100644
--- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
@@ -97,7 +97,7 @@
if (get != null) {
Result result = table.get(get);
if (!result.isEmpty()) {
- return Collections.singletonList(serde.convertToReusedRow(result));
+ return Collections.singletonList(serde.convertToNewRow(result));
}
}
break;