[FLINK-33083] Update logical plans for tests
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
index f916d6b..89299b2 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
@@ -45,7 +45,7 @@
 LogicalSink(table=[default_catalog.default_database.MySink], targetColumns=[[0],[1],[2]], fields=[a, b, filemeta])
 +- LogicalProject(a=[$0], b=[$1], filemeta=[$3])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], filemeta=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTableWithMeta]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTableWithMeta, metadata=[file.path]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
index a4599e3..9806eb1 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
@@ -375,13 +375,13 @@
 +- LogicalProject(a1=[$0], b1=[CAST($1):VARCHAR(32) CHARACTER SET "UTF-16LE"], my_time1=[CAST($2):TIMESTAMP(6)], d1=[CAST($3):DECIMAL(20, 2)])
    +- LogicalProject(a=[$0], b=[$1], my_time=[$4], d1=[CAST($3):DECIMAL(28, 2)])
       +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], my_time=[$4], unUse_time=[$5])
-         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
 
 LogicalSink(table=[default_catalog.default_database.sink2], fields=[a2, update_time])
 +- LogicalProject(a2=[$0], update_time=[CAST($1):TIMESTAMP(6)])
    +- LogicalProject(a=[$0], update_time=[$4])
       +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], my_time=[$4], unUse_time=[$5])
-         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -403,13 +403,13 @@
 +- LogicalProject(a1=[$0], b1=[CAST($1):VARCHAR(32) CHARACTER SET "UTF-16LE"], my_time1=[CAST($2):TIMESTAMP(6)], d1=[CAST($3):DECIMAL(20, 2)])
    +- LogicalProject(a=[$0], b=[$1], ts1=[$4], d1=[CAST($3):DECIMAL(28, 2)])
       +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], ts1=[$4], ts2=[$5])
-         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
 
 LogicalSink(table=[default_catalog.default_database.sink2], fields=[a2, update_time])
 +- LogicalProject(a2=[$0], update_time=[CAST($1):TIMESTAMP(6)])
    +- LogicalProject(a=[$0], update_time=[$4])
       +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], ts1=[$4], ts2=[$5])
-         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
index 556f880..9813db3 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
@@ -109,7 +109,7 @@
       <![CDATA[
 LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.nested2.num), $2)])
 +- LogicalProject(id=[$0], deepNested=[$1], metadata_1=[$2], metadata_2=[$3])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T, metadata=[metadata_1, metadata_2]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReuseSourceWithoutProjectionPushDown.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReuseSourceWithoutProjectionPushDown.out
index ca32541..6d2ab5c 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReuseSourceWithoutProjectionPushDown.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReuseSourceWithoutProjectionPushDown.out
@@ -9,12 +9,12 @@
       },
       "abilities" : [ {
         "type" : "ReadingMetadata",
-        "metadataKeys" : [ "ts", "tags" ],
-        "producedType" : "ROW<`x` VARCHAR(2147483647), `y` VARCHAR(2147483647), `ts` TIMESTAMP(3), `tags` VARCHAR(1)> NOT NULL"
+        "metadataKeys" : [ "tags", "ts" ],
+        "producedType" : "ROW<`x` VARCHAR(2147483647), `y` VARCHAR(2147483647), `tags` VARCHAR(1), `ts` TIMESTAMP(3)> NOT NULL"
       } ]
     },
-    "outputType" : "ROW<`x` VARCHAR(2147483647), `y` VARCHAR(2147483647), `ts` TIMESTAMP(3), `tags` VARCHAR(1)>",
-    "description" : "TableSourceScan(table=[[default_catalog, default_database, src, metadata=[ts, tags]]], fields=[x, y, ts, tags])",
+    "outputType" : "ROW<`x` VARCHAR(2147483647), `y` VARCHAR(2147483647), `tags` VARCHAR(1), `ts` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, src, metadata=[tags, ts]]], fields=[x, y, tags, ts])",
     "inputProperties" : [ ]
   }, {
     "id" : 2,
@@ -25,7 +25,7 @@
       "type" : "VARCHAR(2147483647)"
     }, {
       "kind" : "INPUT_REF",
-      "inputIndex" : 2,
+      "inputIndex" : 3,
       "type" : "TIMESTAMP(3)"
     } ],
     "condition" : null,
@@ -76,7 +76,7 @@
       "internalName" : "$CAST$1",
       "operands" : [ {
         "kind" : "INPUT_REF",
-        "inputIndex" : 3,
+        "inputIndex" : 2,
         "type" : "VARCHAR(1)"
       } ],
       "type" : "VARCHAR(2147483647)"
@@ -146,4 +146,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
index ea87803..0daab2d 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
@@ -1274,14 +1274,14 @@
       :        +- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], ts=[$4], rowtime=[$5])
       :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
       :              +- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)])
-      :                 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+      :                 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, metadata=[metadata_0]]])
       +- LogicalAggregate(group=[{0, 1, 2}], a1=[MIN($3)])
          +- LogicalProject(a0=[$1], window_start=[$6], window_end=[$7], a1=[$2])
             +- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($5), 1000:INTERVAL SECOND)], rowType=[RecordType(INTEGER metadata_0, INTEGER a0, INTEGER a1, INTEGER a2, VARCHAR(2147483647) ts, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
                +- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], ts=[$4], rowtime=[$5])
                   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
                      +- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)])
-                        +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+                        +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, metadata=[metadata_0]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -1320,14 +1320,14 @@
       :        +- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], ts=[$4], rowtime=[$5])
       :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
       :              +- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)])
-      :                 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+      :                 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, metadata=[metadata_0]]])
       +- LogicalAggregate(group=[{0, 1, 2}], a1=[MIN($3)], a2=[MIN($4)], metadata_0=[MIN($5)])
          +- LogicalProject(a0=[$1], window_start=[$6], window_end=[$7], a1=[$2], a2=[$3], metadata_0=[$0])
             +- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($5), 1000:INTERVAL SECOND)], rowType=[RecordType(INTEGER metadata_0, INTEGER a0, INTEGER a1, INTEGER a2, VARCHAR(2147483647) ts, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
                +- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], ts=[$4], rowtime=[$5])
                   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
                      +- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)])
-                        +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+                        +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, metadata=[metadata_0]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
index 710ffa0..3c90cf0 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
@@ -79,14 +79,14 @@
       <![CDATA[
 LogicalProject(m1=[$0], metadata=[$1])
 +- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T2, metadata=[m1, m2, m3]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
 LogicalProject(m1=[$0], metadata=[$1])
 +- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T2, metadata=[m1, m2, m3]]])
 ]]>
     </Resource>
   </TestCase>
@@ -98,14 +98,14 @@
       <![CDATA[
 LogicalProject(EXPR$0=[1])
 +- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T4]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T4, metadata=[m1, m2, m3]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
 LogicalProject(EXPR$0=[1])
 +- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T4]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T4, metadata=[m1, m2, m3]]])
 ]]>
     </Resource>
   </TestCase>
@@ -117,14 +117,14 @@
       <![CDATA[
 LogicalProject(m1=[$0], metadata=[$1])
 +- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1, metadata=[m1, m2, m3]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
 LogicalProject(m1=[$0], metadata=[$1])
 +- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1, metadata=[m1, m2, m3]]])
 ]]>
     </Resource>
   </TestCase>
@@ -136,14 +136,14 @@
       <![CDATA[
 LogicalProject(EXPR$0=[1])
 +- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T3]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, metadata=[m1, m2, m3]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
 LogicalProject(EXPR$0=[1])
 +- LogicalProject(m1=[$0], metadata=[$1], m3=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T3]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, metadata=[m1, m2, m3]]])
 ]]>
     </Resource>
   </TestCase>
@@ -230,7 +230,7 @@
       <![CDATA[
 LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.nested2.num), $2)])
 +- LogicalProject(id=[$0], deepNested=[$1], metadata_1=[$2], metadata_2=[$3], metadata_3=[CAST($2):BIGINT])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_2]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -251,7 +251,7 @@
       <![CDATA[
 LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.nested2.num), $2)])
 +- LogicalProject(id=[$0], deepNested=[$1], metadata_1=[$2], metadata_2=[$3], metadata_3=[CAST($2):BIGINT])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_2]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -287,7 +287,7 @@
       <![CDATA[
 LogicalProject(metadata=[$1], f1=[$0])
 +- LogicalProject(f1=[$0], metadata=[$1], m3=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T5]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, metadata=[m2, m3]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -305,7 +305,7 @@
       <![CDATA[
 LogicalProject(metadata=[$1])
 +- LogicalProject(f1=[$0], metadata=[$1], m3=[$2])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T5]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, metadata=[m2, m3]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -340,7 +340,7 @@
       <![CDATA[
 LogicalProject(id=[$0], metadata_3=[$4], metadata_1=[$2])
 +- LogicalProject(id=[$0], deepNested=[$1], metadata_1=[$2], metadata_2=[$3], metadata_3=[CAST($2):BIGINT])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_2]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
index 910b303..9d65bfeb 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
@@ -16,6 +16,60 @@
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testWatermarkAlignmentWithHint">
+    <Resource name="sql">
+      <![CDATA[select a, c from MyTable /*+ OPTIONS('scan.watermark.alignment.group'='group1', 'scan.watermark.alignment.max-drift'='1min') */]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[OPTIONS inheritPath:[] options:{scan.watermark.alignment.max-drift=1min, scan.watermark.alignment.group=group1}]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, c])
++- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic], watermarkAlignment=[group1, PT1M, PT1S]]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.watermark.alignment.max-drift=1min, scan.watermark.alignment.group=group1}]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testIdleSourceWithHint">
+    <Resource name="sql">
+      <![CDATA[select a, c from MyTable /*+ OPTIONS('scan.watermark.idle-timeout' = '60s')*/]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[OPTIONS inheritPath:[] options:{scan.watermark.idle-timeout=60s}]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, c])
++- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], idletimeout=[60000], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.watermark.idle-timeout=60s}]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testIdleSourceWithOptions">
+    <Resource name="sql">
+      <![CDATA[select a, c from MyTable]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, c])
++- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], idletimeout=[60000], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testSimpleWatermark">
     <Resource name="sql">
       <![CDATA[select a, c from MyTable]]>
@@ -34,6 +88,60 @@
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testWatermarkAlignmentWithOptions">
+    <Resource name="sql">
+      <![CDATA[select a, c from MyTable]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, c])
++- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic], watermarkAlignment=[group1, PT1M, PT1S]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWatermarkEmitStrategyWithHint">
+    <Resource name="sql">
+      <![CDATA[select a, c from MyTable /*+ OPTIONS('scan.watermark.emit.strategy'='on-event') */]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[OPTIONS inheritPath:[] options:{scan.watermark.emit.strategy=on-event}]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, c])
++- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-event]]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.watermark.emit.strategy=on-event}]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWatermarkEmitStrategyWithOptions">
+    <Resource name="sql">
+      <![CDATA[select a, c from MyTable]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, c])
++- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-event]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testWatermarkOnComputedColumn">
     <Resource name="sql">
       <![CDATA[SELECT * from MyTable]]>
@@ -105,14 +213,14 @@
 LogicalProject(a=[$0], b=[$1], c=[$2], metadata=[$3], computed=[$4])
 +- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, CAST(+($3, $4)):INTERVAL SECOND)])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], metadata=[CAST($3):BIGINT], computed=[+(CAST($3):BIGINT, $1)])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, metadata=[metadata_2]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
 FlinkLogicalCalc(select=[a, b, c, metadata, computed])
 +- FlinkLogicalCalc(select=[a, b, Reinterpret(c) AS c, CAST(metadata AS BIGINT) AS metadata, +(CAST(metadata AS BIGINT), b) AS computed])
-   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, CAST(+(CAST(metadata AS BIGINT), +(CAST(metadata AS BIGINT), b)) AS INTERVAL SECOND))], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c, metadata])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, metadata=[metadata_2], watermark=[-(c, CAST(+(CAST(metadata AS BIGINT), +(CAST(metadata AS BIGINT), b)) AS INTERVAL SECOND))], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c, metadata])
 ]]>
     </Resource>
   </TestCase>
@@ -215,112 +323,4 @@
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testWatermarkEmitStrategyWithOptions">
-    <Resource name="sql">
-      <![CDATA[select a, c from MyTable]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], c=[$2])
-+- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-FlinkLogicalCalc(select=[a, c])
-+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-event]]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testWatermarkEmitStrategyWithHint">
-    <Resource name="sql">
-      <![CDATA[select a, c from MyTable /*+ OPTIONS('scan.watermark.emit.strategy'='on-event') */]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], c=[$2])
-+- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[OPTIONS inheritPath:[] options:{scan.watermark.emit.strategy=on-event}]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-FlinkLogicalCalc(select=[a, c])
-+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-event]]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.watermark.emit.strategy=on-event}]]])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testWatermarkAlignmentWithOptions">
-    <Resource name="sql">
-      <![CDATA[select a, c from MyTable]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], c=[$2])
-+- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-FlinkLogicalCalc(select=[a, c])
-+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic], watermarkAlignment=[group1, PT1M, PT1S]]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testWatermarkAlignmentWithHint">
-    <Resource name="sql">
-      <![CDATA[select a, c from MyTable /*+ OPTIONS('scan.watermark.alignment.group'='group1', 'scan.watermark.alignment.max-drift'='1min') */]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], c=[$2])
-+- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[OPTIONS inheritPath:[] options:{scan.watermark.alignment.max-drift=1min, scan.watermark.alignment.group=group1}]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-FlinkLogicalCalc(select=[a, c])
-+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic], watermarkAlignment=[group1, PT1M, PT1S]]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.watermark.alignment.max-drift=1min, scan.watermark.alignment.group=group1}]]])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testIdleSourceWithOptions">
-    <Resource name="sql">
-      <![CDATA[select a, c from MyTable]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], c=[$2])
-+- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-FlinkLogicalCalc(select=[a, c])
-+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], idletimeout=[60000], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testIdleSourceWithHint">
-    <Resource name="sql">
-      <![CDATA[select a, c from MyTable /*+ OPTIONS('scan.watermark.idle-timeout' = '60s')*/]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], c=[$2])
-+- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[OPTIONS inheritPath:[] options:{scan.watermark.idle-timeout=60s}]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-FlinkLogicalCalc(select=[a, c])
-+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, 5000:INTERVAL SECOND)], idletimeout=[60000], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.watermark.idle-timeout=60s}]]])
-]]>
-    </Resource>
-  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
index 118dc7f..f34df4c 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
@@ -296,7 +296,7 @@
 LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()])
 +- LogicalProject(id=[$0], name=[$1])
    +- LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], type=[$4], metadata_1=[$5], metadata_2=[$6])
-      +- LogicalTableScan(table=[[default_catalog, default_database, inventory_meta]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, inventory_meta, metadata=[metadata_1, metadata_2]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -325,7 +325,7 @@
    +- LogicalProject(name=[$1], type=[$4], amount=[$2], metadata_1=[$5])
       +- LogicalFilter(condition=[=($0, 123)])
          +- LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], type=[$4], metadata_1=[$5], metadata_2=[$6])
-            +- LogicalTableScan(table=[[default_catalog, default_database, inventory_meta]])
+            +- LogicalTableScan(table=[[default_catalog, default_database, inventory_meta, metadata=[metadata_1, metadata_2]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
index 32fc618..7a90528 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
@@ -87,6 +87,44 @@
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testPushdownCalcNotAffectChangelogNormalizeKey">
+    <Resource name="sql">
+      <![CDATA[
+SELECT t1.a, t1.b, t2.f
+FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time
+ ON t1.a = t2.a WHERE t2.f = true
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$1], b=[$2], f=[$6])
++- LogicalFilter(condition=[=($6, true)])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 1}])
+      :- LogicalWatermarkAssigner(rowtime=[ingestion_time], watermark=[$0])
+      :  +- LogicalProject(ingestion_time=[CAST($2):TIMESTAMP(3) *ROWTIME*], a=[$0], b=[$1])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, t1, metadata=[ts]]])
+      +- LogicalFilter(condition=[=($cor0.a, $2)])
+         +- LogicalSnapshot(period=[$cor0.ingestion_time])
+            +- LogicalWatermarkAssigner(rowtime=[ingestion_time], watermark=[$1])
+               +- LogicalProject(k=[$0], ingestion_time=[CAST($3):TIMESTAMP(3) *ROWTIME*], a=[$1], f=[$2])
+                  +- LogicalTableScan(table=[[default_catalog, default_database, t2, metadata=[ts]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, f], where=[f], changelogMode=[I])
++- TemporalJoin(joinType=[InnerJoin], where=[AND(=(a, a0), __TEMPORAL_JOIN_CONDITION(ingestion_time, ingestion_time0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(a0), __TEMPORAL_JOIN_LEFT_KEY(a), __TEMPORAL_JOIN_RIGHT_KEY(a0)))], select=[ingestion_time, a, b, ingestion_time0, a0, f], changelogMode=[I])
+   :- Exchange(distribution=[hash[a]], changelogMode=[I])
+   :  +- WatermarkAssigner(rowtime=[ingestion_time], watermark=[ingestion_time], changelogMode=[I])
+   :     +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS ingestion_time, a, b], changelogMode=[I])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, t1, metadata=[ts]]], fields=[a, b, ingestion_time], changelogMode=[I])
+   +- Exchange(distribution=[hash[a]], changelogMode=[I,UA,D])
+      +- WatermarkAssigner(rowtime=[ingestion_time], watermark=[ingestion_time], changelogMode=[I,UA,D])
+         +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS ingestion_time, a, f], changelogMode=[I,UA,D])
+            +- TableSourceScan(table=[[default_catalog, default_database, t2, project=[a, f], metadata=[ts]]], fields=[a, f, ingestion_time], changelogMode=[I,UA,D])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testPushdownNewCalcAndWatermarkAssignerWithCalc">
     <Resource name="sql">
       <![CDATA[
@@ -188,42 +226,4 @@
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testPushdownCalcNotAffectChangelogNormalizeKey">
-    <Resource name="sql">
-      <![CDATA[
-SELECT t1.a, t1.b, t2.f
-FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time
- ON t1.a = t2.a WHERE t2.f = true
-]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$1], b=[$2], f=[$6])
-+- LogicalFilter(condition=[=($6, true)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 1}])
-      :- LogicalWatermarkAssigner(rowtime=[ingestion_time], watermark=[$0])
-      :  +- LogicalProject(ingestion_time=[CAST($2):TIMESTAMP(3) *ROWTIME*], a=[$0], b=[$1])
-      :     +- LogicalTableScan(table=[[default_catalog, default_database, t1]])
-      +- LogicalFilter(condition=[=($cor0.a, $2)])
-         +- LogicalSnapshot(period=[$cor0.ingestion_time])
-            +- LogicalWatermarkAssigner(rowtime=[ingestion_time], watermark=[$1])
-               +- LogicalProject(k=[$0], ingestion_time=[CAST($3):TIMESTAMP(3) *ROWTIME*], a=[$1], f=[$2])
-                  +- LogicalTableScan(table=[[default_catalog, default_database, t2]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[a, b, f], where=[f], changelogMode=[I])
-+- TemporalJoin(joinType=[InnerJoin], where=[AND(=(a, a0), __TEMPORAL_JOIN_CONDITION(ingestion_time, ingestion_time0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(a0), __TEMPORAL_JOIN_LEFT_KEY(a), __TEMPORAL_JOIN_RIGHT_KEY(a0)))], select=[ingestion_time, a, b, ingestion_time0, a0, f], changelogMode=[I])
-   :- Exchange(distribution=[hash[a]], changelogMode=[I])
-   :  +- WatermarkAssigner(rowtime=[ingestion_time], watermark=[ingestion_time], changelogMode=[I])
-   :     +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS ingestion_time, a, b], changelogMode=[I])
-   :        +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b, ingestion_time], changelogMode=[I])
-   +- Exchange(distribution=[hash[a]], changelogMode=[I,UA,D])
-      +- WatermarkAssigner(rowtime=[ingestion_time], watermark=[ingestion_time], changelogMode=[I,UA,D])
-         +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS ingestion_time, a, f], changelogMode=[I,UA,D])
-            +- TableSourceScan(table=[[default_catalog, default_database, t2, project=[a, f], metadata=[ts]]], fields=[a, f, ingestion_time], changelogMode=[I,UA,D])
-]]>
-    </Resource>
-  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
index 53120e0..0fa4965 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
@@ -732,39 +732,6 @@
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCdcLeftJoinDimWithNonDeterministicPreFilter[nonDeterministicUpdateStrategy=IGNORE]">
-    <Resource name="sql">
-      <![CDATA[
-insert into sink_with_pk
-select t1.a, t2.b as version, t2.c
-from (
-  select *, proctime() proctime from cdc
-) t1 left join dim_with_pk for system_time as of t1.proctime as t2
-on t1.a = t2.a
-  and t1.b > UNIX_TIMESTAMP() - 300
-]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, version, c])
-+- LogicalProject(a=[$0], version=[$6], c=[$7])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1, 4}])
-      :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], proctime=[PROCTIME()])
-      :  +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
-      +- LogicalFilter(condition=[AND(=($cor0.a, $0), >($cor0.b, -(UNIX_TIMESTAMP(), 300)))])
-         +- LogicalSnapshot(period=[$cor0.proctime])
-            +- LogicalTableScan(table=[[default_catalog, default_database, dim_with_pk]])
-]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
-Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, version, c], upsertMaterialize=[true])
-+- Calc(select=[a, b0 AS version, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[LeftOuterJoin], lookup=[a=a], joinCondition=[(b > (UNIX_TIMESTAMP() - 300))], select=[a, b, a0, b0, c])
-      +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testCdcJoinDimWithPkNonDeterministicLocalCondition[nonDeterministicUpdateStrategy=IGNORE]">
     <Resource name="sql">
       <![CDATA[
@@ -1128,6 +1095,39 @@
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testCdcLeftJoinDimWithNonDeterministicPreFilter[nonDeterministicUpdateStrategy=IGNORE]">
+    <Resource name="sql">
+      <![CDATA[
+insert into sink_with_pk
+select t1.a, t2.b as version, t2.c
+from (
+  select *, proctime() proctime from cdc
+) t1 left join dim_with_pk for system_time as of t1.proctime as t2
+on t1.a = t2.a
+  and t1.b > UNIX_TIMESTAMP() - 300
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, version, c])
++- LogicalProject(a=[$0], version=[$6], c=[$7])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1, 4}])
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], proctime=[PROCTIME()])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
+      +- LogicalFilter(condition=[AND(=($cor0.a, $0), >($cor0.b, -(UNIX_TIMESTAMP(), 300)))])
+         +- LogicalSnapshot(period=[$cor0.proctime])
+            +- LogicalTableScan(table=[[default_catalog, default_database, dim_with_pk]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, version, c], upsertMaterialize=[true])
++- Calc(select=[a, b0 AS version, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[LeftOuterJoin], lookup=[a=a], joinCondition=[(b > (UNIX_TIMESTAMP() - 300))], select=[a, b, a0, b0, c])
+      +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testCdcLeftJoinDimWithoutPkSinkWithoutPk[nonDeterministicUpdateStrategy=IGNORE]">
     <Resource name="sql">
       <![CDATA[
@@ -1529,7 +1529,7 @@
 +- LogicalProject(a=[CAST($0):INTEGER], b=[CAST($4):BIGINT], c=[$7])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
       :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
-      :  +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
       +- LogicalTableFunctionScan(invocation=[str_split($cor0.c)], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
 ]]>
     </Resource>
@@ -1538,7 +1538,7 @@
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c], upsertMaterialize=[true])
 +- Calc(select=[CAST(a AS INTEGER) AS a, CAST(metadata_1 AS BIGINT) AS b, EXPR$0 AS c])
    +- Correlate(invocation=[str_split($cor0.c)], correlate=[table(str_split($cor0.c))], select=[a,b,c,d,metadata_1,metadata_2,metadata_3,EXPR$0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, BOOLEAN d, INTEGER metadata_1, VARCHAR(2147483647) metadata_2, BIGINT metadata_3, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
-      +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta]], fields=[a, b, c, d, metadata_1, metadata_2, metadata_3])
+      +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]], fields=[a, b, c, d, metadata_1, metadata_2, metadata_3])
 ]]>
     </Resource>
   </TestCase>
@@ -1555,7 +1555,7 @@
 LogicalLegacySink(name=[`default_catalog`.`default_database`.`legacy_retract_sink`], fields=[a, b, c])
 +- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
-      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -1579,7 +1579,7 @@
 LogicalLegacySink(name=[`default_catalog`.`default_database`.`legacy_upsert_sink`], fields=[a, b, c])
 +- LogicalProject(a=[CAST($0):INTEGER], b=[$6], c=[CAST($2):VARCHAR(100) CHARACTER SET "UTF-16LE"])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
-      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -1604,7 +1604,7 @@
 LogicalLegacySink(name=[`default_catalog`.`default_database`.`legacy_upsert_sink`], fields=[a, b, c])
 +- LogicalProject(a=[CAST($0):INTEGER], b=[$6], c=[CAST($2):VARCHAR(100) CHARACTER SET "UTF-16LE"])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
-      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -1628,7 +1628,7 @@
 LogicalSink(table=[default_catalog.default_database.sink_with_composite_pk], fields=[a, b, c, e])
 +- LogicalProject(a=[$0], b=[$1], c=[$2], e=[$5])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_3=[$4], e=[$4])
-      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta_rename]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta_rename, metadata=[metadata_3]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -1651,7 +1651,7 @@
 LogicalSink(table=[default_catalog.default_database.sink_with_composite_pk], fields=[a, b, c, metadata_3])
 +- LogicalProject(a=[$0], b=[$1], c=[$2], metadata_3=[$6])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
-      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -1674,7 +1674,7 @@
 LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
 +- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
-      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -1698,7 +1698,7 @@
 LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, metadata_3, c])
 +- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
-      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -1723,7 +1723,7 @@
 LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, metadata_3, c])
 +- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
-      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -2511,14 +2511,14 @@
          +- LogicalProject(a=[$1], b=[$3], day=[DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd')])
             +- LogicalProject(id=[$0], a=[$1.nested2.num], name=[$1.nested1.name], b=[+(+($1.nested1.value, $1.nested2.num), $3)])
                +- LogicalProject(id=[$0], deepNested=[$1], name=[$2], metadata_1=[$3], metadata_2=[$4])
-                  +- LogicalTableScan(table=[[default_catalog, default_database, nested_src]])
+                  +- LogicalTableScan(table=[[default_catalog, default_database, nested_src, metadata=[metadata_1, metadata_2]]])
 
 LogicalSink(table=[default_catalog.default_database.sink2], fields=[a, b, d])
 +- LogicalProject(a=[$1], b=[$2], d=[CAST($3):BIGINT])
    +- LogicalFilter(condition=[>($3, 100)])
       +- LogicalProject(id=[$0], a=[$1.nested2.num], name=[$1.nested1.name], b=[+(+($1.nested1.value, $1.nested2.num), $3)])
          +- LogicalProject(id=[$0], deepNested=[$1], name=[$2], metadata_1=[$3], metadata_2=[$4])
-            +- LogicalTableScan(table=[[default_catalog, default_database, nested_src]])
+            +- LogicalTableScan(table=[[default_catalog, default_database, nested_src, metadata=[metadata_1, metadata_2]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -3073,7 +3073,7 @@
    :  +- LogicalTableScan(table=[[default_catalog, default_database, src]])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], metadata_3=[$6])
       +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
-         +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -3105,7 +3105,7 @@
    :  +- LogicalTableScan(table=[[default_catalog, default_database, src]])
    +- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
       +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
-         +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -3137,7 +3137,7 @@
    :  +- LogicalTableScan(table=[[default_catalog, default_database, src]])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], metadata_3=[$6])
       +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6])
-         +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
index 6915a1c..5218853 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
@@ -138,7 +138,7 @@
 LogicalProject(a=[$0], b=[$1])
 +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], originTime=[$3], rowtime=[TO_TIMESTAMP_LTZ($3, 3)])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyLtzTable]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyLtzTable, metadata=[originTime]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -176,7 +176,7 @@
 LogicalProject(a=[$0], b=[$1])
 +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], originTime=[$3], rowtime=[TO_TIMESTAMP(FROM_UNIXTIME(/($3, 1000)), _UTF-16LE'yyyy-MM-dd HH:mm:ss')])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, metadata=[originTime]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml
index b50b7e4..b0aeef1 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml
@@ -241,13 +241,13 @@
 +- LogicalProject(a1=[$0], b1=[CAST($1):VARCHAR(32) CHARACTER SET "UTF-16LE"], my_time1=[CAST($2):TIMESTAMP(6)], d1=[CAST($3):DECIMAL(20, 2)])
    +- LogicalProject(a=[$0], b=[$1], ts1=[$4], d1=[CAST($3):DECIMAL(28, 2)])
       +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], ts1=[$4], ts2=[$5])
-         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
 
 LogicalSink(table=[default_catalog.default_database.sink2], fields=[a2, update_time])
 +- LogicalProject(a2=[$0], update_time=[CAST($1):TIMESTAMP(6)])
    +- LogicalProject(a=[$0], update_time=[$4])
       +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], ts1=[$4], ts2=[$5])
-         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -471,13 +471,13 @@
 +- LogicalProject(a1=[$0], b1=[CAST($1):VARCHAR(32) CHARACTER SET "UTF-16LE"], my_time1=[CAST($2):TIMESTAMP(6)], d1=[CAST($3):DECIMAL(20, 2)])
    +- LogicalProject(a=[$0], b=[$1], my_time=[$4], d1=[CAST($3):DECIMAL(28, 2)])
       +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], my_time=[$4], unUse_time=[$5])
-         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
 
 LogicalSink(table=[default_catalog.default_database.sink2], fields=[a2, update_time])
 +- LogicalProject(a2=[$0], update_time=[CAST($1):TIMESTAMP(6)])
    +- LogicalProject(a=[$0], update_time=[$4])
       +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], my_time=[$4], unUse_time=[$5])
-         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, reuseTable, metadata=[ts1, ts2]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 141570e..71d028c 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -190,13 +190,13 @@
       <![CDATA[
 LogicalProject(a=[$0], other_metadata=[$1], b=[$2], c=[$3], metadata_1=[$4], computed=[$5])
 +- LogicalProject(a=[$0], other_metadata=[CAST($4):INTEGER], b=[$1], c=[$2], metadata_1=[$3], computed=[UPPER($3)])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_3]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, CAST(other_metadata AS INTEGER) AS other_metadata, b, c, metadata_1, UPPER(metadata_1) AS computed])
-+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], fields=[a, b, c, metadata_1, other_metadata])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_3]]], fields=[a, b, c, metadata_1, other_metadata])
 ]]>
     </Resource>
   </TestCase>
@@ -208,7 +208,7 @@
       <![CDATA[
 LogicalProject(b=[$2], other_metadata=[$1])
 +- LogicalProject(a=[$0], other_metadata=[CAST($4):INTEGER], b=[$1], c=[$2], metadata_1=[$3])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_3]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -226,13 +226,13 @@
       <![CDATA[
 LogicalProject(timestamp=[$0], metadata_timestamp=[$1], other=[$2], computed_other=[$3], computed_timestamp=[$4])
 +- LogicalProject(timestamp=[$0], metadata_timestamp=[$2], other=[$1], computed_other=[UPPER($1)], computed_timestamp=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[other, timestamp]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[timestamp, metadata_timestamp, other, UPPER(other) AS computed_other, CAST(metadata_timestamp AS VARCHAR(2147483647)) AS computed_timestamp])
-+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], fields=[timestamp, other, metadata_timestamp])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, metadata=[other, timestamp]]], fields=[timestamp, other, metadata_timestamp])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index 92df169..fc1a7ca 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -515,7 +515,7 @@
       <![CDATA[
 LogicalSink(table=[default_catalog.default_database.MetadataTable], fields=[a, b, c, metadata_1, metadata_2])
 +- LogicalProject(a=[$0], b=[$1], c=[$2], metadata_1=[$3], metadata_2=[CAST(CAST($4):INTEGER):BIGINT])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_2, metadata_3]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -531,7 +531,7 @@
       <![CDATA[
 LogicalSink(table=[default_catalog.default_database.MetadataTable], fields=[metadata_1, metadata_2, other, metadata_23])
 +- LogicalProject(metadata_1=[$0], metadata_2=[$1], other=[$2], metadata_23=[$4])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, metadata=[metadata_1, metadata_2]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index 697fc02..d5ffd5f 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -75,7 +75,7 @@
       <![CDATA[
 LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.nested2.num), $2)])
 +- LogicalProject(id=[$0], deepNested=[$1], metadata_1=[$2], metadata_2=[$3])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T, metadata=[metadata_1, metadata_2]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -98,7 +98,7 @@
       <![CDATA[
 LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.nested2.num), $2)])
 +- LogicalProject(id=[$0], deepNested=[$1], metadata_1=[$2], metadata_2=[$3])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T, metadata=[metadata_1, metadata_2]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -213,13 +213,13 @@
       <![CDATA[
 LogicalProject(id=[$0], ts1=[$5], op=[$3])
 +- LogicalProject(id=[$0], name=[$1], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], ts=[$4], ts1=[+($4, 10000:INTERVAL SECOND)])
-   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[id, (ts + 10000:INTERVAL SECOND) AS ts1, CAST(op AS VARCHAR(2147483647)) AS op])
-+- TableSourceScan(table=[[default_catalog, default_database, src, metadata=[op, ts]]], fields=[id, name, op, ts])
++- TableSourceScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]], fields=[id, name, op, tags, ts])
 ]]>
     </Resource>
   </TestCase>
@@ -231,7 +231,7 @@
       <![CDATA[
 LogicalProject(id=[$0], ts1=[$5], op=[$3])
 +- LogicalProject(id=[$0], name=[$1], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], ts=[$4], ts1=[+($4, 10000:INTERVAL SECOND)])
-   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -280,13 +280,13 @@
       <![CDATA[
 LogicalProject(ts=[$4], id=[$0], name=[$1], tags=[$2], op=[$3])
 +- LogicalProject(id=[$0], name=[$1], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], ts=[$4], ts1=[+($4, 10000:INTERVAL SECOND)])
-   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[ts, id, name, CAST(tags AS VARCHAR(2147483647)) AS tags, CAST(op AS VARCHAR(2147483647)) AS op])
-+- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, name, op, tags, ts])
++- TableSourceScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]], fields=[id, name, op, tags, ts])
 ]]>
     </Resource>
   </TestCase>
@@ -298,13 +298,13 @@
       <![CDATA[
 LogicalProject(id=[$0], ts=[$4], tags=[$2])
 +- LogicalProject(id=[$0], name=[$1], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], ts=[$4], ts1=[+($4, 10000:INTERVAL SECOND)])
-   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[id, ts, CAST(tags AS VARCHAR(2147483647)) AS tags])
-+- TableSourceScan(table=[[default_catalog, default_database, src, metadata=[tags, ts]]], fields=[id, name, tags, ts])
++- TableSourceScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]], fields=[id, name, op, tags, ts])
 ]]>
     </Resource>
   </TestCase>
@@ -335,7 +335,7 @@
       <![CDATA[
 LogicalProject(id=[$0], ts=[$4], tags=[$2])
 +- LogicalProject(id=[$0], name=[$1], tags=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], op=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], ts=[$4], ts1=[+($4, 10000:INTERVAL SECOND)])
-   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[op, tags, ts]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index d328316..55f72f3 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -265,7 +265,7 @@
 LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
 +- LogicalProject($f0=[0])
    +- LogicalProject(sys_col=[$2], id=[$0], cnt=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[sys_col]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -286,7 +286,7 @@
 LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
 +- LogicalProject($f0=[0])
    +- LogicalProject(sys_col=[$2], id=[$1], cnt=[$0])
-      +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[cnt, id, sys_col]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -307,7 +307,7 @@
 LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
 +- LogicalProject($f0=[0])
    +- LogicalProject(nested=[$0], sys_col=[$3], id=[$1], cnt=[$2])
-      +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, src, metadata=[sys_col]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">