[FLINK-35233] Fix lookup cache reuse RowData object problem (#47)

* fix: convertToReusedRow() is now returned by default, and the result returned is a reused object. If lookup.cache is enabled, the result encapsulated in the reused object will be cached externally, resulting in all cached values being the same object

* [FLINK-35233] Fix lookup cache reuse RowData object problem

---------

Co-authored-by: xiekunyuan <xiekunyuan@meizu.com>
diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index a946edd..9a2736e 100644
--- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -48,6 +48,8 @@
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -591,14 +593,16 @@
         assertThat(result).isEqualTo(expected);
     }
 
-    @Test
-    void testHBaseLookupTableSource() {
-        verifyHBaseLookupJoin(false);
+    @ParameterizedTest
+    @EnumSource(Caching.class)
+    void testHBaseLookupTableSource(Caching caching) {
+        verifyHBaseLookupJoin(caching, false);
     }
 
-    @Test
-    void testHBaseAsyncLookupTableSource() {
-        verifyHBaseLookupJoin(true);
+    @ParameterizedTest
+    @EnumSource(Caching.class)
+    void testHBaseAsyncLookupTableSource(Caching caching) {
+        verifyHBaseLookupJoin(caching, true);
     }
 
     @Test
@@ -661,10 +665,22 @@
         sinkFunction.close();
     }
 
-    private void verifyHBaseLookupJoin(boolean async) {
+    private void verifyHBaseLookupJoin(Caching caching, boolean async) {
         StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
 
+        String cacheOptions = "";
+        if (caching == Caching.ENABLE_CACHE) {
+            cacheOptions =
+                    ","
+                            + String.join(
+                                    ",",
+                                    Arrays.asList(
+                                            "'lookup.cache' = 'PARTIAL'",
+                                            "'lookup.partial-cache.max-rows' = '1000'",
+                                            "'lookup.partial-cache.expire-after-write' = '10min'"));
+        }
+
         tEnv.executeSql(
                 "CREATE TABLE "
                         + TEST_TABLE_1
@@ -686,6 +702,7 @@
                         + " 'zookeeper.quorum' = '"
                         + getZookeeperQuorum()
                         + "'"
+                        + cacheOptions
                         + ")");
 
         // prepare a source table
@@ -725,6 +742,8 @@
         expected.add(
                 "+I[1, 1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]");
         expected.add(
+                "+I[1, 1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]");
+        expected.add(
                 "+I[2, 2, 20, Hello-2, 200, 2.02, true, Welt-2, 2019-08-18T19:01, 2019-08-18, 19:01, 12345678.0002]");
         expected.add(
                 "+I[3, 2, 30, Hello-3, 300, 3.03, false, Welt-3, 2019-08-18T19:02, 2019-08-18, 19:02, 12345678.0003]");
@@ -750,6 +769,12 @@
         testData.add(Row.of(2, 2L, "Hello"));
         testData.add(Row.of(3, 2L, "Hello world"));
         testData.add(Row.of(3, 3L, "Hello world!"));
+        testData.add(Row.of(1, 1L, "Hi")); // lookup one more time
+    }
+
+    private enum Caching {
+        ENABLE_CACHE,
+        DISABLE_CACHE
     }
 
     // ------------------------------- Utilities -------------------------------------------------
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
index 0e1ba54..59edf30 100644
--- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
@@ -97,7 +97,7 @@
                 if (get != null) {
                     Result result = table.get(get);
                     if (!result.isEmpty()) {
-                        return Collections.singletonList(serde.convertToReusedRow(result));
+                        return Collections.singletonList(serde.convertToNewRow(result));
                     }
                 }
                 break;