[FLINK-33083] Update logical plans for tests
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
index f916d6b..89299b2 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
@@ -45,7 +45,7 @@
LogicalSink(table=[default_catalog.default_database.MySink], targetColumns=[[0],[1],[2]], fields=[a, b, filemeta])
+- LogicalProject(a=[$0], b=[$1], filemeta=[$3])
+- LogicalProject(a=[$0], b=[$1], c=[$2], filemeta=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTableWithMeta]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTableWithMeta, metadata=[file.path]]])
]]>
</Resource>
<Resource name="optimized rel plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
index a4599e3..9806eb1 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
@@ -375,13 +375,13 @@
+- LogicalProject(a1=[$0], b1=[CAST($1):VARCHAR(32) CHARACTER SET "UTF-16LE"], my_time1=[CAST($2):TIMESTAMP(6)], d1=[CAST($3):DECIMAL(20, 2)])
+- LogicalProject(a=[$0], b=[$1], my_time=[$4], d1=[CAST($3):DECIMAL(28, 2)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], my_time=[$4], unUse_time=[$5])
- +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
LogicalSink(table=[default_catalog.default_database.sink2], fields=[a2, update_time])
+- LogicalProject(a2=[$0], update_time=[CAST($1):TIMESTAMP(6)])
+- LogicalProject(a=[$0], update_time=[$4])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], my_time=[$4], unUse_time=[$5])
- +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -403,13 +403,13 @@
+- LogicalProject(a1=[$0], b1=[CAST($1):VARCHAR(32) CHARACTER SET "UTF-16LE"], my_time1=[CAST($2):TIMESTAMP(6)], d1=[CAST($3):DECIMAL(20, 2)])
+- LogicalProject(a=[$0], b=[$1], ts1=[$4], d1=[CAST($3):DECIMAL(28, 2)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], ts1=[$4], ts2=[$5])
- +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
LogicalSink(table=[default_catalog.default_database.sink2], fields=[a2, update_time])
+- LogicalProject(a2=[$0], update_time=[CAST($1):TIMESTAMP(6)])
+- LogicalProject(a=[$0], update_time=[$4])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], ts1=[$4], ts2=[$5])
- +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
]]>
</Resource>
<Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
index 556f880..9813db3 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
@@ -109,7 +109,7 @@
<![CDATA[
LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.nested2.num), $2)])
+- LogicalProject(id=[$0], deepNested=[$1], metadata_1=[$2], metadata_2=[$3])
- +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T, metadata=[metadata_1, metadata_2]]])
]]>
</Resource>
<Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReuseSourceWithoutProjectionPushDown.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReuseSourceWithoutProjectionPushDown.out
index ca32541..6d2ab5c 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReuseSourceWithoutProjectionPushDown.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReuseSourceWithoutProjectionPushDown.out
@@ -9,12 +9,12 @@
},
"abilities" : [ {
"type" : "ReadingMetadata",
- "metadataKeys" : [ "ts", "tags" ],
- "producedType" : "ROW<`x` VARCHAR(2147483647), `y` VARCHAR(2147483647), `ts` TIMESTAMP(3), `tags` VARCHAR(1)> NOT NULL"
+ "metadataKeys" : [ "tags", "ts" ],
+ "producedType" : "ROW<`x` VARCHAR(2147483647), `y` VARCHAR(2147483647), `tags` VARCHAR(1), `ts` TIMESTAMP(3)> NOT NULL"
} ]
},
- "outputType" : "ROW<`x` VARCHAR(2147483647), `y` VARCHAR(2147483647), `ts` TIMESTAMP(3), `tags` VARCHAR(1)>",
- "description" : "TableSourceScan(table=[[default_catalog, default_database, src, metadata=[ts, tags]]], fields=[x, y, ts, tags])",
+ "outputType" : "ROW<`x` VARCHAR(2147483647), `y` VARCHAR(2147483647), `tags` VARCHAR(1), `ts` TIMESTAMP(3)>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, src, metadata=[tags, ts]]], fields=[x, y, tags, ts])",
"inputProperties" : [ ]
}, {
"id" : 2,
@@ -25,7 +25,7 @@
"type" : "VARCHAR(2147483647)"
}, {
"kind" : "INPUT_REF",
- "inputIndex" : 2,
+ "inputIndex" : 3,
"type" : "TIMESTAMP(3)"
} ],
"condition" : null,
@@ -76,7 +76,7 @@
"internalName" : "$CAST$1",
"operands" : [ {
"kind" : "INPUT_REF",
- "inputIndex" : 3,
+ "inputIndex" : 2,
"type" : "VARCHAR(1)"
} ],
"type" : "VARCHAR(2147483647)"
@@ -146,4 +146,4 @@
},
"shuffleMode" : "PIPELINED"
} ]
-}
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
index ea87803..0daab2d 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
@@ -1274,14 +1274,14 @@
: +- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], ts=[$4], rowtime=[$5])
: +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
: +- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)])
- : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, metadata=[metadata_0]]])
+- LogicalAggregate(group=[{0, 1, 2}], a1=[MIN($3)])
+- LogicalProject(a0=[$1], window_start=[$6], window_end=[$7], a1=[$2])
+- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($5), 1000:INTERVAL SECOND)], rowType=[RecordType(INTEGER metadata_0, INTEGER a0, INTEGER a1, INTEGER a2, VARCHAR(2147483647) ts, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], ts=[$4], rowtime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, metadata=[metadata_0]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -1320,14 +1320,14 @@
: +- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], ts=[$4], rowtime=[$5])
: +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
: +- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)])
- : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, metadata=[metadata_0]]])
+- LogicalAggregate(group=[{0, 1, 2}], a1=[MIN($3)], a2=[MIN($4)], metadata_0=[MIN($5)])
+- LogicalProject(a0=[$1], window_start=[$6], window_end=[$7], a1=[$2], a2=[$3], metadata_0=[$0])
+- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($5), 1000:INTERVAL SECOND)], rowType=[RecordType(INTEGER metadata_0, INTEGER a0, INTEGER a1, INTEGER a2, VARCHAR(2147483647) ts, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], ts=[$4], rowtime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, metadata=[metadata_0]]])
]]>
</Resource>
<Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
index 710ffa0..3c90cf0 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
@@ -79,14 +79,14 @@
<![CDATA[
LogicalProject(m1=[$0], metadata=[$1])
+- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2, metadata=[m1, m2, m3]]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(m1=[$0], metadata=[$1])
+- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2, metadata=[m1, m2, m3]]])
]]>
</Resource>
</TestCase>
@@ -98,14 +98,14 @@
<![CDATA[
LogicalProject(EXPR$0=[1])
+- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, T4]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T4, metadata=[m1, m2, m3]]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(EXPR$0=[1])
+- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, T4]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T4, metadata=[m1, m2, m3]]])
]]>
</Resource>
</TestCase>
@@ -117,14 +117,14 @@
<![CDATA[
LogicalProject(m1=[$0], metadata=[$1])
+- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T1, metadata=[m1, m2, m3]]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(m1=[$0], metadata=[$1])
+- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T1, metadata=[m1, m2, m3]]])
]]>
</Resource>
</TestCase>
@@ -136,14 +136,14 @@
<![CDATA[
LogicalProject(EXPR$0=[1])
+- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, T3]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, metadata=[m1, m2, m3]]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(EXPR$0=[1])
+- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, T3]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, metadata=[m1, m2, m3]]])
]]>
</Resource>
</TestCase>
@@ -230,7 +230,7 @@
<![CDATA[
LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.nested2.num), $2)])
+- LogicalProject(id=[$0], deepNested=[$1], metadata_1=[$2], metadata_2=[$3], metadata_3=[CAST($2):BIGINT])
- +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_2]]])
]]>
</Resource>
<Resource name="optimized rel plan">
@@ -251,7 +251,7 @@
<![CDATA[
LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.nested2.num), $2)])
+- LogicalProject(id=[$0], deepNested=[$1], metadata_1=[$2], metadata_2=[$3], metadata_3=[CAST($2):BIGINT])
- +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_2]]])
]]>
</Resource>
<Resource name="optimized rel plan">
@@ -287,7 +287,7 @@
<![CDATA[
LogicalProject(metadata=[$1], f1=[$0])
+- LogicalProject(f1=[$0], metadata=[$1], m3=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, T5]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, metadata=[m2, m3]]])
]]>
</Resource>
<Resource name="optimized rel plan">
@@ -305,7 +305,7 @@
<![CDATA[
LogicalProject(metadata=[$1])
+- LogicalProject(f1=[$0], metadata=[$1], m3=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, T5]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, metadata=[m2, m3]]])
]]>
</Resource>
<Resource name="optimized rel plan">
@@ -340,7 +340,7 @@
<![CDATA[
LogicalProject(id=[$0], metadata_3=[$4], metadata_1=[$2])
+- LogicalProject(id=[$0], deepNested=[$1], metadata_1=[$2], metadata_2=[$3], metadata_3=[CAST($2):BIGINT])
- +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_2]]])
]]>
</Resource>
<Resource name="optimized rel plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
index 910b303..9d65bfeb 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
@@ -16,6 +16,60 @@
limitations under the License.
-->
<Root>
+ <TestCase name="testWatermarkAlignmentWithHint">
+ <Resource name="sql">
+ <![CDATA[select a, c from MyTable /*+ OPTIONS('scan.watermark.alignment.group'='group1', 'scan.watermark.alignment.max-drift'='1min') */]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[OPTIONS inheritPath:[] options:{scan.watermark.alignment.max-drift=1min, scan.watermark.alignment.group=group1}]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, c])
++- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic], watermarkAlignment=[group1, PT1M, PT1S]]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.watermark.alignment.max-drift=1min, scan.watermark.alignment.group=group1}]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testIdleSourceWithHint">
+ <Resource name="sql">
+ <![CDATA[select a, c from MyTable /*+ OPTIONS('scan.watermark.idle-timeout' = '60s')*/]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[OPTIONS inheritPath:[] options:{scan.watermark.idle-timeout=60s}]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, c])
++- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], idletimeout=[60000], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.watermark.idle-timeout=60s}]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testIdleSourceWithOptions">
+ <Resource name="sql">
+ <![CDATA[select a, c from MyTable]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, c])
++- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], idletimeout=[60000], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testSimpleWatermark">
<Resource name="sql">
<![CDATA[select a, c from MyTable]]>
@@ -34,6 +88,60 @@
]]>
</Resource>
</TestCase>
+ <TestCase name="testWatermarkAlignmentWithOptions">
+ <Resource name="sql">
+ <![CDATA[select a, c from MyTable]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, c])
++- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic], watermarkAlignment=[group1, PT1M, PT1S]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testWatermarkEmitStrategyWithHint">
+ <Resource name="sql">
+ <![CDATA[select a, c from MyTable /*+ OPTIONS('scan.watermark.emit.strategy'='on-event') */]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[OPTIONS inheritPath:[] options:{scan.watermark.emit.strategy=on-event}]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, c])
++- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-event]]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.watermark.emit.strategy=on-event}]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testWatermarkEmitStrategyWithOptions">
+ <Resource name="sql">
+ <![CDATA[select a, c from MyTable]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, c])
++- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-event]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testWatermarkOnComputedColumn">
<Resource name="sql">
<![CDATA[SELECT * from MyTable]]>
@@ -105,14 +213,14 @@
LogicalProject(a=[$0], b=[$1], c=[$2], metadata=[$3], computed=[$4])
+- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, CAST(+($3, $4)):INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], metadata=[CAST($3):BIGINT], computed=[+(CAST($3):BIGINT, $1)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, metadata=[metadata_2]]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
FlinkLogicalCalc(select=[a, b, c, metadata, computed])
+- FlinkLogicalCalc(select=[a, b, Reinterpret(c) AS c, CAST(metadata AS BIGINT) AS metadata, +(CAST(metadata AS BIGINT), b) AS computed])
- +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, CAST(+(CAST(metadata AS BIGINT), +(CAST(metadata AS BIGINT), b)) AS INTERVAL SECOND))], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c, metadata])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, metadata=[metadata_2], watermark=[-(c, CAST(+(CAST(metadata AS BIGINT), +(CAST(metadata AS BIGINT), b)) AS INTERVAL SECOND))], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c, metadata])
]]>
</Resource>
</TestCase>
@@ -215,112 +323,4 @@
]]>
</Resource>
</TestCase>
- <TestCase name="testWatermarkEmitStrategyWithOptions">
- <Resource name="sql">
- <![CDATA[select a, c from MyTable]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(a=[$0], c=[$2])
-+- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-FlinkLogicalCalc(select=[a, c])
-+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-event]]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testWatermarkEmitStrategyWithHint">
- <Resource name="sql">
- <![CDATA[select a, c from MyTable /*+ OPTIONS('scan.watermark.emit.strategy'='on-event') */]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(a=[$0], c=[$2])
-+- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[OPTIONS inheritPath:[] options:{scan.watermark.emit.strategy=on-event}]]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-FlinkLogicalCalc(select=[a, c])
-+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-event]]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.watermark.emit.strategy=on-event}]]])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testWatermarkAlignmentWithOptions">
- <Resource name="sql">
- <![CDATA[select a, c from MyTable]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(a=[$0], c=[$2])
-+- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-FlinkLogicalCalc(select=[a, c])
-+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic], watermarkAlignment=[group1, PT1M, PT1S]]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testWatermarkAlignmentWithHint">
- <Resource name="sql">
- <![CDATA[select a, c from MyTable /*+ OPTIONS('scan.watermark.alignment.group'='group1', 'scan.watermark.alignment.max-drift'='1min') */]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(a=[$0], c=[$2])
-+- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[OPTIONS inheritPath:[] options:{scan.watermark.alignment.max-drift=1min, scan.watermark.alignment.group=group1}]]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-FlinkLogicalCalc(select=[a, c])
-+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic], watermarkAlignment=[group1, PT1M, PT1S]]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.watermark.alignment.max-drift=1min, scan.watermark.alignment.group=group1}]]])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testIdleSourceWithOptions">
- <Resource name="sql">
- <![CDATA[select a, c from MyTable]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(a=[$0], c=[$2])
-+- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-FlinkLogicalCalc(select=[a, c])
-+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], idletimeout=[60000], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testIdleSourceWithHint">
- <Resource name="sql">
- <![CDATA[select a, c from MyTable /*+ OPTIONS('scan.watermark.idle-timeout' = '60s')*/]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(a=[$0], c=[$2])
-+- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[OPTIONS inheritPath:[] options:{scan.watermark.idle-timeout=60s}]]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-FlinkLogicalCalc(select=[a, c])
-+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], idletimeout=[60000], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.watermark.idle-timeout=60s}]]])
-]]>
- </Resource>
- </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
index 118dc7f..f34df4c 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
@@ -296,7 +296,7 @@
LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()])
+- LogicalProject(id=[$0], name=[$1])
+- LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], type=[$4], metadata_1=[$5], metadata_2=[$6])
- +- LogicalTableScan(table=[[default_catalog, default_database, inventory_meta]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, inventory_meta, metadata=[metadata_1, metadata_2]]])
]]>
</Resource>
<Resource name="optimized rel plan">
@@ -325,7 +325,7 @@
+- LogicalProject(name=[$1], type=[$4], amount=[$2], metadata_1=[$5])
+- LogicalFilter(condition=[=($0, 123)])
+- LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], type=[$4], metadata_1=[$5], metadata_2=[$6])
- +- LogicalTableScan(table=[[default_catalog, default_database, inventory_meta]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, inventory_meta, metadata=[metadata_1, metadata_2]]])
]]>
</Resource>
<Resource name="optimized rel plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
index 32fc618..7a90528 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
@@ -87,6 +87,44 @@
]]>
</Resource>
</TestCase>
+ <TestCase name="testPushdownCalcNotAffectChangelogNormalizeKey">
+ <Resource name="sql">
+ <![CDATA[
+SELECT t1.a, t1.b, t2.f
+FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time
+ ON t1.a = t2.a WHERE t2.f = true
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$1], b=[$2], f=[$6])
++- LogicalFilter(condition=[=($6, true)])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 1}])
+ :- LogicalWatermarkAssigner(rowtime=[ingestion_time], watermark=[$0])
+ : +- LogicalProject(ingestion_time=[CAST($2):TIMESTAMP(3) *ROWTIME*], a=[$0], b=[$1])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, t1, metadata=[ts]]])
+ +- LogicalFilter(condition=[=($cor0.a, $2)])
+ +- LogicalSnapshot(period=[$cor0.ingestion_time])
+ +- LogicalWatermarkAssigner(rowtime=[ingestion_time], watermark=[$1])
+ +- LogicalProject(k=[$0], ingestion_time=[CAST($3):TIMESTAMP(3) *ROWTIME*], a=[$1], f=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, t2, metadata=[ts]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, f], where=[f], changelogMode=[I])
++- TemporalJoin(joinType=[InnerJoin], where=[AND(=(a, a0), __TEMPORAL_JOIN_CONDITION(ingestion_time, ingestion_time0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(a0), __TEMPORAL_JOIN_LEFT_KEY(a), __TEMPORAL_JOIN_RIGHT_KEY(a0)))], select=[ingestion_time, a, b, ingestion_time0, a0, f], changelogMode=[I])
+ :- Exchange(distribution=[hash[a]], changelogMode=[I])
+ : +- WatermarkAssigner(rowtime=[ingestion_time], watermark=[ingestion_time], changelogMode=[I])
+ : +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS ingestion_time, a, b], changelogMode=[I])
+ : +- TableSourceScan(table=[[default_catalog, default_database, t1, metadata=[ts]]], fields=[a, b, ingestion_time], changelogMode=[I])
+ +- Exchange(distribution=[hash[a]], changelogMode=[I,UA,D])
+ +- WatermarkAssigner(rowtime=[ingestion_time], watermark=[ingestion_time], changelogMode=[I,UA,D])
+ +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS ingestion_time, a, f], changelogMode=[I,UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database, t2, project=[a, f], metadata=[ts]]], fields=[a, f, ingestion_time], changelogMode=[I,UA,D])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testPushdownNewCalcAndWatermarkAssignerWithCalc">
<Resource name="sql">
<![CDATA[
@@ -188,42 +226,4 @@
]]>
</Resource>
</TestCase>
- <TestCase name="testPushdownCalcNotAffectChangelogNormalizeKey">
- <Resource name="sql">
- <![CDATA[
-SELECT t1.a, t1.b, t2.f
-FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time
- ON t1.a = t2.a WHERE t2.f = true
-]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(a=[$1], b=[$2], f=[$6])
-+- LogicalFilter(condition=[=($6, true)])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 1}])
- :- LogicalWatermarkAssigner(rowtime=[ingestion_time], watermark=[$0])
- : +- LogicalProject(ingestion_time=[CAST($2):TIMESTAMP(3) *ROWTIME*], a=[$0], b=[$1])
- : +- LogicalTableScan(table=[[default_catalog, default_database, t1]])
- +- LogicalFilter(condition=[=($cor0.a, $2)])
- +- LogicalSnapshot(period=[$cor0.ingestion_time])
- +- LogicalWatermarkAssigner(rowtime=[ingestion_time], watermark=[$1])
- +- LogicalProject(k=[$0], ingestion_time=[CAST($3):TIMESTAMP(3) *ROWTIME*], a=[$1], f=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, t2]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Calc(select=[a, b, f], where=[f], changelogMode=[I])
-+- TemporalJoin(joinType=[InnerJoin], where=[AND(=(a, a0), __TEMPORAL_JOIN_CONDITION(ingestion_time, ingestion_time0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(a0), __TEMPORAL_JOIN_LEFT_KEY(a), __TEMPORAL_JOIN_RIGHT_KEY(a0)))], select=[ingestion_time, a, b, ingestion_time0, a0, f], changelogMode=[I])
- :- Exchange(distribution=[hash[a]], changelogMode=[I])
- : +- WatermarkAssigner(rowtime=[ingestion_time], watermark=[ingestion_time], changelogMode=[I])
- : +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS ingestion_time, a, b], changelogMode=[I])
- : +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b, ingestion_time], changelogMode=[I])
- +- Exchange(distribution=[hash[a]], changelogMode=[I,UA,D])
- +- WatermarkAssigner(rowtime=[ingestion_time], watermark=[ingestion_time], changelogMode=[I,UA,D])
- +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS ingestion_time, a, f], changelogMode=[I,UA,D])
- +- TableSourceScan(table=[[default_catalog, default_database, t2, project=[a, f], metadata=[ts]]], fields=[a, f, ingestion_time], changelogMode=[I,UA,D])
-]]>
- </Resource>
- </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
index 53120e0..0fa4965 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
@@ -732,39 +732,6 @@
]]>
</Resource>
</TestCase>
- <TestCase name="testCdcLeftJoinDimWithNonDeterministicPreFilter[nonDeterministicUpdateStrategy=IGNORE]">
- <Resource name="sql">
- <![CDATA[
-insert into sink_with_pk
-select t1.a, t2.b as version, t2.c
-from (
- select *, proctime() proctime from cdc
-) t1 left join dim_with_pk for system_time as of t1.proctime as t2
-on t1.a = t2.a
- and t1.b > UNIX_TIMESTAMP() - 300
-]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, version, c])
-+- LogicalProject(a=[$0], version=[$6], c=[$7])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1, 4}])
- :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], proctime=[PROCTIME()])
- : +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
- +- LogicalFilter(condition=[AND(=($cor0.a, $0), >($cor0.b, -(UNIX_TIMESTAMP(), 300)))])
- +- LogicalSnapshot(period=[$cor0.proctime])
- +- LogicalTableScan(table=[[default_catalog, default_database, dim_with_pk]])
-]]>
- </Resource>
- <Resource name="optimized exec plan">
- <![CDATA[
-Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, version, c], upsertMaterialize=[true])
-+- Calc(select=[a, b0 AS version, c])
- +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[LeftOuterJoin], lookup=[a=a], joinCondition=[(b > (UNIX_TIMESTAMP() - 300))], select=[a, b, a0, b0, c])
- +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b])
-]]>
- </Resource>
- </TestCase>
<TestCase name="testCdcJoinDimWithPkNonDeterministicLocalCondition[nonDeterministicUpdateStrategy=IGNORE]">
<Resource name="sql">
<![CDATA[
@@ -1128,6 +1095,39 @@
]]>
</Resource>
</TestCase>
+ <TestCase name="testCdcLeftJoinDimWithNonDeterministicPreFilter[nonDeterministicUpdateStrategy=IGNORE]">
+ <Resource name="sql">
+ <![CDATA[
+insert into sink_with_pk
+select t1.a, t2.b as version, t2.c
+from (
+ select *, proctime() proctime from cdc
+) t1 left join dim_with_pk for system_time as of t1.proctime as t2
+on t1.a = t2.a
+ and t1.b > UNIX_TIMESTAMP() - 300
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, version, c])
++- LogicalProject(a=[$0], version=[$6], c=[$7])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1, 4}])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
+ +- LogicalFilter(condition=[AND(=($cor0.a, $0), >($cor0.b, -(UNIX_TIMESTAMP(), 300)))])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, dim_with_pk]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, version, c], upsertMaterialize=[true])
++- Calc(select=[a, b0 AS version, c])
+ +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[LeftOuterJoin], lookup=[a=a], joinCondition=[(b > (UNIX_TIMESTAMP() - 300))], select=[a, b, a0, b0, c])
+ +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testCdcLeftJoinDimWithoutPkSinkWithoutPk[nonDeterministicUpdateStrategy=IGNORE]">
<Resource name="sql">
<![CDATA[
@@ -1529,7 +1529,7 @@
+- LogicalProject(a=[CAST($0):INTEGER], b=[CAST($4):BIGINT], c=[$7])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
:- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
- : +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
+- LogicalTableFunctionScan(invocation=[str_split($cor0.c)], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
]]>
</Resource>
@@ -1538,7 +1538,7 @@
Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c], upsertMaterialize=[true])
+- Calc(select=[CAST(a AS INTEGER) AS a, CAST(metadata_1 AS BIGINT) AS b, EXPR$0 AS c])
+- Correlate(invocation=[str_split($cor0.c)], correlate=[table(str_split($cor0.c))], select=[a,b,c,d,metadata_1,metadata_2,metadata_3,EXPR$0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, BOOLEAN d, INTEGER metadata_1, VARCHAR(2147483647) metadata_2, BIGINT metadata_3, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
- +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta]], fields=[a, b, c, d, metadata_1, metadata_2, metadata_3])
+ +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]], fields=[a, b, c, d, metadata_1, metadata_2, metadata_3])
]]>
</Resource>
</TestCase>
@@ -1555,7 +1555,7 @@
LogicalLegacySink(name=[`default_catalog`.`default_database`.`legacy_retract_sink`], fields=[a, b, c])
+- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
- +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -1579,7 +1579,7 @@
LogicalLegacySink(name=[`default_catalog`.`default_database`.`legacy_upsert_sink`], fields=[a, b, c])
+- LogicalProject(a=[CAST($0):INTEGER], b=[$6], c=[CAST($2):VARCHAR(100) CHARACTER SET "UTF-16LE"])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
- +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -1604,7 +1604,7 @@
LogicalLegacySink(name=[`default_catalog`.`default_database`.`legacy_upsert_sink`], fields=[a, b, c])
+- LogicalProject(a=[CAST($0):INTEGER], b=[$6], c=[CAST($2):VARCHAR(100) CHARACTER SET "UTF-16LE"])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
- +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -1628,7 +1628,7 @@
LogicalSink(table=[default_catalog.default_database.sink_with_composite_pk], fields=[a, b, c, e])
+- LogicalProject(a=[$0], b=[$1], c=[$2], e=[$5])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_3=[$4], e=[$4])
- +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta_rename]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta_rename, metadata=[metadata_3]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -1651,7 +1651,7 @@
LogicalSink(table=[default_catalog.default_database.sink_with_composite_pk], fields=[a, b, c, metadata_3])
+- LogicalProject(a=[$0], b=[$1], c=[$2], metadata_3=[$6])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
- +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -1674,7 +1674,7 @@
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
- +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -1698,7 +1698,7 @@
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, metadata_3, c])
+- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
- +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -1723,7 +1723,7 @@
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, metadata_3, c])
+- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
- +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -2511,14 +2511,14 @@
+- LogicalProject(a=[$1], b=[$3], day=[DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd')])
+- LogicalProject(id=[$0], a=[$1.nested2.num], name=[$1.nested1.name], b=[+(+($1.nested1.value, $1.nested2.num), $3)])
+- LogicalProject(id=[$0], deepNested=[$1], name=[$2], metadata_1=[$3], metadata_2=[$4])
- +- LogicalTableScan(table=[[default_catalog, default_database, nested_src]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, nested_src, metadata=[metadata_1, metadata_2]]])
LogicalSink(table=[default_catalog.default_database.sink2], fields=[a, b, d])
+- LogicalProject(a=[$1], b=[$2], d=[CAST($3):BIGINT])
+- LogicalFilter(condition=[>($3, 100)])
+- LogicalProject(id=[$0], a=[$1.nested2.num], name=[$1.nested1.name], b=[+(+($1.nested1.value, $1.nested2.num), $3)])
+- LogicalProject(id=[$0], deepNested=[$1], name=[$2], metadata_1=[$3], metadata_2=[$4])
- +- LogicalTableScan(table=[[default_catalog, default_database, nested_src]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, nested_src, metadata=[metadata_1, metadata_2]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -3073,7 +3073,7 @@
: +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+- LogicalProject(a=[$0], b=[$1], c=[$2], metadata_3=[$6])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
- +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -3105,7 +3105,7 @@
: +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
- +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -3137,7 +3137,7 @@
: +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+- LogicalProject(a=[$0], b=[$1], c=[$2], metadata_3=[$6])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
- +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
]]>
</Resource>
<Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
index 6915a1c..5218853 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
@@ -138,7 +138,7 @@
LogicalProject(a=[$0], b=[$1])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+- LogicalProject(a=[$0], b=[$1], c=[$2], originTime=[$3], rowtime=[TO_TIMESTAMP_LTZ($3, 3)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyLtzTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyLtzTable, metadata=[originTime]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -176,7 +176,7 @@
LogicalProject(a=[$0], b=[$1])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+- LogicalProject(a=[$0], b=[$1], c=[$2], originTime=[$3], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME(/($3, 1000)), _UTF-16LE'yyyy-MM-dd HH:mm:ss')])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, metadata=[originTime]]])
]]>
</Resource>
<Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml
index b50b7e4..b0aeef1 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml
@@ -241,13 +241,13 @@
+- LogicalProject(a1=[$0], b1=[CAST($1):VARCHAR(32) CHARACTER SET "UTF-16LE"], my_time1=[CAST($2):TIMESTAMP(6)], d1=[CAST($3):DECIMAL(20, 2)])
+- LogicalProject(a=[$0], b=[$1], ts1=[$4], d1=[CAST($3):DECIMAL(28, 2)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], ts1=[$4], ts2=[$5])
- +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
LogicalSink(table=[default_catalog.default_database.sink2], fields=[a2, update_time])
+- LogicalProject(a2=[$0], update_time=[CAST($1):TIMESTAMP(6)])
+- LogicalProject(a=[$0], update_time=[$4])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], ts1=[$4], ts2=[$5])
- +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -471,13 +471,13 @@
+- LogicalProject(a1=[$0], b1=[CAST($1):VARCHAR(32) CHARACTER SET "UTF-16LE"], my_time1=[CAST($2):TIMESTAMP(6)], d1=[CAST($3):DECIMAL(20, 2)])
+- LogicalProject(a=[$0], b=[$1], my_time=[$4], d1=[CAST($3):DECIMAL(28, 2)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], my_time=[$4], unUse_time=[$5])
- +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
LogicalSink(table=[default_catalog.default_database.sink2], fields=[a2, update_time])
+- LogicalProject(a2=[$0], update_time=[CAST($1):TIMESTAMP(6)])
+- LogicalProject(a=[$0], update_time=[$4])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], my_time=[$4], unUse_time=[$5])
- +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
]]>
</Resource>
<Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 141570e..71d028c 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -190,13 +190,13 @@
<![CDATA[
LogicalProject(a=[$0], other_metadata=[$1], b=[$2], c=[$3], metadata_1=[$4], computed=[$5])
+- LogicalProject(a=[$0], other_metadata=[CAST($4):INTEGER], b=[$1], c=[$2], metadata_1=[$3], computed=[UPPER($3)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_3]]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, CAST(other_metadata AS INTEGER) AS other_metadata, b, c, metadata_1, UPPER(metadata_1) AS computed])
-+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], fields=[a, b, c, metadata_1, other_metadata])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_3]]], fields=[a, b, c, metadata_1, other_metadata])
]]>
</Resource>
</TestCase>
@@ -208,7 +208,7 @@
<![CDATA[
LogicalProject(b=[$2], other_metadata=[$1])
+- LogicalProject(a=[$0], other_metadata=[CAST($4):INTEGER], b=[$1], c=[$2], metadata_1=[$3])
- +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_3]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -226,13 +226,13 @@
<![CDATA[
LogicalProject(timestamp=[$0], metadata_timestamp=[$1], other=[$2], computed_other=[$3], computed_timestamp=[$4])
+- LogicalProject(timestamp=[$0], metadata_timestamp=[$2], other=[$1], computed_other=[UPPER($1)], computed_timestamp=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
- +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[other, timestamp]]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[timestamp, metadata_timestamp, other, UPPER(other) AS computed_other, CAST(metadata_timestamp AS VARCHAR(2147483647)) AS computed_timestamp])
-+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], fields=[timestamp, other, metadata_timestamp])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, metadata=[other, timestamp]]], fields=[timestamp, other, metadata_timestamp])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index 92df169..fc1a7ca 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -515,7 +515,7 @@
<![CDATA[
LogicalSink(table=[default_catalog.default_database.MetadataTable], fields=[a, b, c, metadata_1, metadata_2])
+- LogicalProject(a=[$0], b=[$1], c=[$2], metadata_1=[$3], metadata_2=[CAST(CAST($4):INTEGER):BIGINT])
- +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_2, metadata_3]]])
]]>
</Resource>
<Resource name="optimized rel plan">
@@ -531,7 +531,7 @@
<![CDATA[
LogicalSink(table=[default_catalog.default_database.MetadataTable], fields=[metadata_1, metadata_2, other, metadata_23])
+- LogicalProject(metadata_1=[$0], metadata_2=[$1], other=[$2], metadata_23=[$4])
- +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_2]]])
]]>
</Resource>
<Resource name="optimized rel plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index 697fc02..d5ffd5f 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -75,7 +75,7 @@
<![CDATA[
LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.nested2.num), $2)])
+- LogicalProject(id=[$0], deepNested=[$1], metadata_1=[$2], metadata_2=[$3])
- +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T, metadata=[metadata_1, metadata_2]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -98,7 +98,7 @@
<![CDATA[
LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.nested2.num), $2)])
+- LogicalProject(id=[$0], deepNested=[$1], metadata_1=[$2], metadata_2=[$3])
- +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T, metadata=[metadata_1, metadata_2]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -213,13 +213,13 @@
<![CDATA[
LogicalProject(id=[$0], ts1=[$5], op=[$3])
+- LogicalProject(id=[$0], name=[$1], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], ts=[$4], ts1=[+($4, 10000:INTERVAL SECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[id, (ts + 10000:INTERVAL SECOND) AS ts1, CAST(op AS VARCHAR(2147483647)) AS op])
-+- TableSourceScan(table=[[default_catalog, default_database, src, metadata=[op, ts]]], fields=[id, name, op, ts])
++- TableSourceScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]], fields=[id, name, op, tags, ts])
]]>
</Resource>
</TestCase>
@@ -231,7 +231,7 @@
<![CDATA[
LogicalProject(id=[$0], ts1=[$5], op=[$3])
+- LogicalProject(id=[$0], name=[$1], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], ts=[$4], ts1=[+($4, 10000:INTERVAL SECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -280,13 +280,13 @@
<![CDATA[
LogicalProject(ts=[$4], id=[$0], name=[$1], tags=[$2], op=[$3])
+- LogicalProject(id=[$0], name=[$1], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], ts=[$4], ts1=[+($4, 10000:INTERVAL SECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[ts, id, name, CAST(tags AS VARCHAR(2147483647)) AS tags, CAST(op AS VARCHAR(2147483647)) AS op])
-+- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, name, op, tags, ts])
++- TableSourceScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]], fields=[id, name, op, tags, ts])
]]>
</Resource>
</TestCase>
@@ -298,13 +298,13 @@
<![CDATA[
LogicalProject(id=[$0], ts=[$4], tags=[$2])
+- LogicalProject(id=[$0], name=[$1], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], ts=[$4], ts1=[+($4, 10000:INTERVAL SECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[id, ts, CAST(tags AS VARCHAR(2147483647)) AS tags])
-+- TableSourceScan(table=[[default_catalog, default_database, src, metadata=[tags, ts]]], fields=[id, name, tags, ts])
++- TableSourceScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]], fields=[id, name, op, tags, ts])
]]>
</Resource>
</TestCase>
@@ -335,7 +335,7 @@
<![CDATA[
LogicalProject(id=[$0], ts=[$4], tags=[$2])
+- LogicalProject(id=[$0], name=[$1], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], ts=[$4], ts1=[+($4, 10000:INTERVAL SECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]])
]]>
</Resource>
<Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index d328316..55f72f3 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -265,7 +265,7 @@
LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+- LogicalProject($f0=[0])
+- LogicalProject(sys_col=[$2], id=[$0], cnt=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[sys_col]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -286,7 +286,7 @@
LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+- LogicalProject($f0=[0])
+- LogicalProject(sys_col=[$2], id=[$1], cnt=[$0])
- +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[cnt, id, sys_col]]])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -307,7 +307,7 @@
LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+- LogicalProject($f0=[0])
+- LogicalProject(nested=[$0], sys_col=[$3], id=[$1], cnt=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[sys_col]]])
]]>
</Resource>
<Resource name="optimized exec plan">