[FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key
This closes #17699
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
index 501d470..62a7d63 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
@@ -302,10 +302,12 @@
List<String> expected =
Arrays.asList(
+ "+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, 2020-05-13T12:38:35, 12-pack drill bits]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, 2020-05-13T12:38:35, spare tire]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:06.301, 2020-05-13T12:39:06, hammer]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:09.489, 2020-05-13T12:39:09, rocks]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:18.230, 2020-05-13T12:39:18, jacket]",
+ "+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, 2020-05-13T12:42:33, car battery]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, 2020-05-13T12:42:33, scooter]");
waitingExpectedResults("sink", expected, Duration.ofSeconds(10));
@@ -435,10 +437,12 @@
List<String> expected =
Arrays.asList(
+ "+I[changelog_maxwell, test, product, null, 2020-08-06T03:34:43, 12-pack drill bits]",
"+I[changelog_maxwell, test, product, null, 2020-08-06T03:34:43, spare tire]",
"+I[changelog_maxwell, test, product, null, 2020-08-06T03:34:53, hammer]",
"+I[changelog_maxwell, test, product, null, 2020-08-06T03:34:57, rocks]",
"+I[changelog_maxwell, test, product, null, 2020-08-06T03:35:06, jacket]",
+ "+I[changelog_maxwell, test, product, null, 2020-08-06T03:35:28, car battery]",
"+I[changelog_maxwell, test, product, null, 2020-08-06T03:35:28, scooter]");
waitingExpectedResults("sink", expected, Duration.ofSeconds(10));