IGNITE-21827: Sql. Union returns incorrect result for DECIMAL and INT columns (#3474)

diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
index 39d1212..1cb1e03 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
@@ -278,6 +278,25 @@
                 .check();
     }
 
+    @Test
+    public void testUnionDifferentNumericTypes() {
+        String query = ""
+                + "SELECT id, val FROM t1 "
+                + "UNION "
+                + "SELECT id, val FROM t2";
+
+        assertQuery(query)
+                .returns(1, new BigDecimal("1.00"))
+                .returns(2, new BigDecimal("2.00"))
+                .returns(3, new BigDecimal("3.00"))
+                .returns(4, new BigDecimal("4.00"))
+                .columnMetadata(
+                        new MetadataMatcher().type(ColumnType.INT32),
+                        new MetadataMatcher().type(ColumnType.DECIMAL)
+                )
+                .check();
+    }
+
     /**
      * Test that set op node can be rewinded.
      */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/UnionConverterRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/UnionConverterRule.java
index 0105822..f506fc0 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/UnionConverterRule.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/UnionConverterRule.java
@@ -48,16 +48,18 @@
     /** {@inheritDoc} */
     @Override
     public void onMatch(RelOptRuleCall call) {
-        final LogicalUnion union = call.rel(0);
+        LogicalUnion union = call.rel(0);
 
         RelOptCluster cluster = union.getCluster();
         RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE);
-        List<RelNode> inputs = Commons.transform(union.getInputs(), input -> convert(input, traits));
 
-        RelNode res = new IgniteUnionAll(cluster, traits, inputs);
+        List<RelNode> inputs = Commons.transform(union.getInputs(), input -> convert(input, traits));
+        List<RelNode> convertedInputs = Commons.castInputsToLeastRestrictiveTypeIfNeeded(inputs, cluster, traits);
+
+        RelNode res = new IgniteUnionAll(cluster, traits, convertedInputs);
 
         if (!union.all) {
-            final RelBuilder relBuilder = relBuilderFactory.create(union.getCluster(), null);
+            RelBuilder relBuilder = relBuilderFactory.create(union.getCluster(), null);
 
             relBuilder
                     .push(res)
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index fa1ac16..4e38162 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -66,11 +66,13 @@
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.hint.HintStrategyTable;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.type.SqlTypeCoercionRule;
+import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.sql.validate.SqlValidator;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
 import org.apache.calcite.tools.FrameworkConfig;
@@ -850,7 +852,8 @@
 
         // Output type of a set operator is equal to leastRestrictive(inputTypes) (see SetOp::deriveRowType)
 
-        RelDataType resultType = cluster.getTypeFactory().leastRestrictive(inputRowTypes);
+        RelDataTypeFactory typeFactory = cluster.getTypeFactory();
+        RelDataType resultType = typeFactory.leastRestrictive(inputRowTypes);
         if (resultType == null) {
             throw new IllegalArgumentException("Cannot compute compatible row type for arguments to set op: " + inputRowTypes);
         }
@@ -864,7 +867,10 @@
         for (RelNode input : inputs) {
             RelDataType inputRowType = input.getRowType();
 
-            if (resultType.equalsSansFieldNames(inputRowType)) {
+            // We can ignore nullability because it is always safe to convert from
+            // ROW (T1 nullable, T2 not nullable) to ROW (T1 nullable, T2 nullable)
+            // and leastRestrictive does exactly that.
+            if (SqlTypeUtil.equalAsStructSansNullability(typeFactory, resultType, inputRowType, null)) {
                 actualInputs.add(input);
 
                 continue;
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index ec2456f..784bc64 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -89,6 +89,7 @@
 import org.apache.ignite.internal.sql.engine.prepare.PlanningContext;
 import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSystemViewScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
@@ -755,6 +756,16 @@
         rel.getInputs().forEach(this::clearHints);
     }
 
+    protected Predicate<? extends RelNode> projectFromTable(String tableName, String... exprs) {
+        return isInstanceOf(IgniteProject.class)
+                .and(projection -> {
+                    String actualProjStr = projection.getProjects().toString();
+                    String expectedProjStr = Arrays.asList(exprs).toString();
+                    return actualProjStr.equals(expectedProjStr);
+                })
+                .and(hasChildThat(isTableScan(tableName)));
+    }
+
     /**
      * TestTableDescriptor.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java
index 25f4f5c..8f956e3 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SetOpPlannerTest.java
@@ -17,16 +17,12 @@
 
 package org.apache.ignite.internal.sql.engine.planner;
 
-import java.util.Arrays;
 import java.util.List;
-import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
 import org.apache.calcite.rel.RelDistribution.Type;
-import org.apache.calcite.rel.RelNode;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
 import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
-import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTrimExchange;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteColocatedIntersect;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteColocatedMinus;
@@ -762,23 +758,13 @@
                 + setOp
                 + " SELECT * FROM table2";
 
-        assertPlan(sql, publicSchema, nodeOrAnyChild(isInstanceOf(setOp.map)
-                        .and(input(0, projectFromTable("TABLE1", "CAST($0):INTEGER", "$1")))
+        assertPlan(sql, publicSchema, nodeOrAnyChild(isInstanceOf(setOp.colocated)
+                        .and(input(0, isTableScan("TABLE1")))
                         .and(input(1, isTableScan("TABLE2")))
                 )
         );
     }
 
-    private Predicate<? extends RelNode> projectFromTable(String tableName, String... exprs) {
-        return isInstanceOf(IgniteProject.class)
-                .and(projection -> {
-                    String actualProjStr = projection.getProjects().toString();
-                    String expectedProjStr = Arrays.asList(exprs).toString();
-                    return actualProjStr.equals(expectedProjStr);
-                })
-                .and(hasChildThat(isTableScan(tableName)));
-    }
-
     private String setOp(SetOp setOp) {
         return setOp.name() + ' ';
     }
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/UnionPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/UnionPlannerTest.java
index c207b25..3b55e93 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/UnionPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/UnionPlannerTest.java
@@ -19,6 +19,7 @@
 
 import java.util.function.UnaryOperator;
 import org.apache.calcite.rel.core.Union;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
 import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteUnionAll;
@@ -32,6 +33,7 @@
  * Union test.
  */
 public class UnionPlannerTest extends AbstractPlannerTest {
+
     @Test
     public void testUnion() throws Exception {
         IgniteSchema publicSchema = prepareSchema();
@@ -72,6 +74,74 @@
                 .and(input(2, hasChildThat(isTableScan("TABLE3")))));
     }
 
+    @Test
+    public void testUnionAllResultsInLeastRestrictiveType() throws Exception {
+        IgniteSchema publicSchema = createSchema(
+                TestBuilders.table()
+                        .name("TABLE1")
+                        .addColumn("C1", NativeTypes.INT32)
+                        .addColumn("C2", NativeTypes.STRING)
+                        .distribution(someAffinity())
+                        .build(),
+
+                TestBuilders.table()
+                        .name("TABLE2")
+                        .addColumn("C1", NativeTypes.DOUBLE)
+                        .addColumn("C2", NativeTypes.STRING)
+                        .distribution(someAffinity())
+                        .build(),
+
+                TestBuilders.table()
+                        .name("TABLE3")
+                        .addColumn("C1", NativeTypes.INT64)
+                        .addColumn("C2", NativeTypes.STRING)
+                        .distribution(someAffinity())
+                        .build()
+        );
+
+        String sql = ""
+                + "SELECT * FROM table1 "
+                + "UNION ALL "
+                + "SELECT * FROM table2 "
+                + "UNION ALL "
+                + "SELECT * FROM table3 ";
+
+        assertPlan(sql, publicSchema, isInstanceOf(IgniteUnionAll.class)
+                .and(input(0, projectFromTable("TABLE1", "CAST($0):DOUBLE", "$1"))
+                .and(input(1, hasChildThat(isTableScan("TABLE2"))))
+                .and(input(2, projectFromTable("TABLE3", "CAST($0):DOUBLE", "$1"))))
+        );
+    }
+
+    @Test
+    public void testUnionAllDifferentNullability() throws Exception {
+        IgniteSchema publicSchema = createSchema(
+                TestBuilders.table()
+                        .name("TABLE1")
+                        .addColumn("C1", NativeTypes.INT32, false)
+                        .addColumn("C2", NativeTypes.STRING)
+                        .distribution(someAffinity())
+                        .build(),
+
+                TestBuilders.table()
+                        .name("TABLE2")
+                        .addColumn("C1", NativeTypes.INT32, true)
+                        .addColumn("C2", NativeTypes.STRING)
+                        .distribution(someAffinity())
+                        .build()
+        );
+
+        String sql = ""
+                + "SELECT * FROM table1 "
+                + "UNION ALL "
+                + "SELECT * FROM table2";
+
+        assertPlan(sql, publicSchema, isInstanceOf(IgniteUnionAll.class)
+                .and(input(0, isInstanceOf(IgniteExchange.class).and(input(isTableScan("TABLE1"))))
+                        .and(input(1, isInstanceOf(IgniteExchange.class).and(input(isTableScan("TABLE2"))))))
+        );
+    }
+
     /**
      * Create schema for tests.
      *
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
index 4357cca..9062e14 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
@@ -133,8 +133,8 @@
         // Node 2 has the same type as leastRestrictive(row1, row2)
         assertSame(node2, relNodes.get(1), "Invalid types in projection for node2");
 
-        IgniteProject project3 = assertInstanceOf(IgniteProject.class, relNodes.get(2), "node2");
-        assertEquals(lt, project3.getRowType(), "Invalid types in projection for node3");
+        // Nullability is ignored
+        assertSame(node3, relNodes.get(2));
 
         IgniteProject project4 = assertInstanceOf(IgniteProject.class, relNodes.get(3), "node4");
         assertEquals(lt, project4.getRowType(), "Invalid types in projection for node4");