DRILL-7615: UNION ALL query returns the wrong result for the decimal value

closes #2006
diff --git a/common/src/main/java/org/apache/drill/common/types/Types.java b/common/src/main/java/org/apache/drill/common/types/Types.java
index 8660989..596199a 100644
--- a/common/src/main/java/org/apache/drill/common/types/Types.java
+++ b/common/src/main/java/org/apache/drill/common/types/Types.java
@@ -20,6 +20,7 @@
 import static org.apache.drill.common.types.TypeProtos.DataMode.REPEATED;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -132,6 +133,17 @@
   }
 
   /**
+   * Returns true if all specified types are decimal data types.
+   *
+   * @param types types to check
+   * @return true if all specified types are decimal data type.
+   */
+  public static boolean areDecimalTypes(MinorType... types) {
+    return Arrays.stream(types)
+        .allMatch(Types::isDecimalType);
+  }
+
+  /**
    * Returns true if specified type is decimal data type.
    *
    * @param type type to check
@@ -795,15 +807,29 @@
    */
   public static MajorType.Builder calculateTypePrecisionAndScale(MajorType leftType, MajorType rightType, MajorType.Builder typeBuilder) {
     if (leftType.getMinorType().equals(rightType.getMinorType())) {
-      final boolean isScalarString = Types.isScalarStringType(leftType) && Types.isScalarStringType(rightType);
-      final boolean isDecimal = isDecimalType(leftType);
+      boolean isScalarString = Types.isScalarStringType(leftType) && Types.isScalarStringType(rightType);
+      boolean isDecimal = isDecimalType(leftType);
 
-      if ((isScalarString || isDecimal) && leftType.hasPrecision() && rightType.hasPrecision()) {
+      if (isScalarString && leftType.hasPrecision() && rightType.hasPrecision()) {
         typeBuilder.setPrecision(Math.max(leftType.getPrecision(), rightType.getPrecision()));
       }
 
-      if (isDecimal && leftType.hasScale() && rightType.hasScale()) {
-        typeBuilder.setScale(Math.max(leftType.getScale(), rightType.getScale()));
+      if (isDecimal) {
+        int scale = Math.max(leftType.getScale(), rightType.getScale());
+        // resulting precision should take into account resulting scale value and be calculated as
+        // sum of two components:
+        // - max integer digits number (precision - scale) for left and right;
+        // - resulting scale.
+        // So for the case of cast(9999 as decimal(4,0)) and cast(1.23 as decimal(3,2))
+        // resulting scale would be Max(0, 2) = 2 and resulting precision
+        // would be Max(4 - 0, 3 - 2) + 2 = 6.
+        // In this case, both values would fit into decimal(6, 2): 9999.00, 1.23
+        int leftNumberOfDigits = leftType.getPrecision() - leftType.getScale();
+        int rightNumberOfDigits = rightType.getPrecision() - rightType.getScale();
+        int precision = Math.max(leftNumberOfDigits, rightNumberOfDigits) + scale;
+
+        typeBuilder.setPrecision(precision);
+        typeBuilder.setScale(scale);
       }
     }
     return typeBuilder;
@@ -815,18 +841,31 @@
    *
    * @param type1 first type
    * @param type2 second type
-   * @return true if the two types are are the same minor type, mode,
+   * @return true if the two types have the same minor type, mode,
    * precision and scale
    */
 
   public static boolean isSameType(MajorType type1, MajorType type2) {
-    return type1.getMinorType() == type2.getMinorType() &&
-           type1.getMode() == type2.getMode() &&
+    return isSameTypeAndMode(type1, type2) &&
            type1.getScale() == type2.getScale() &&
            type1.getPrecision() == type2.getPrecision();
   }
 
   /**
+   * Check if two "core" types have the same minor type and data mode,
+   * ignoring subtypes and children. Primarily for non-complex types.
+   *
+   * @param first  first type to check
+   * @param second second type to check
+   * @return {@code true} if the two types have the same minor type and mode,
+   * {@code false} otherwise
+   */
+  public static boolean isSameTypeAndMode(MajorType first, MajorType second) {
+    return first.getMinorType() == second.getMinorType()
+        && first.getMode() == second.getMode();
+  }
+
+  /**
    * Requires full type equality, including fields such as precision and scale.
    * But, unset fields are equivalent to 0. Can't use the protobuf-provided
    * isEquals() which treats set and unset fields as different.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
index 90e8558..f3e8dc0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
@@ -159,7 +159,7 @@
     // or both of them are decimal
     if (TypeCastRules.isNumericType(input1) && TypeCastRules.isNumericType(input2)
         && ((!Types.isDecimalType(input1) && !Types.isDecimalType(input2))
-          || Types.isDecimalType(input1) && Types.isDecimalType(input2))) {
+          || Types.areDecimalTypes(input1, input2))) {
       return true;
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 2039e79..ce68fa4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -30,6 +30,7 @@
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -175,7 +176,7 @@
     transfers.clear();
     allocationVectors.clear();
 
-    final ClassGenerator<UnionAller> cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getOptions());
+    ClassGenerator<UnionAller> cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
     // cg.getCodeGenerator().saveCodeForDebugging(true);
 
@@ -184,38 +185,37 @@
       ValueVector vvIn = vw.getValueVector();
       ValueVector vvOut = container.getValueVector(index).getValueVector();
 
-      final ErrorCollector collector = new ErrorCollectorImpl();
-      // According to input data names, Minortypes, Datamodes, choose to
+      MaterializedField inField = vvIn.getField();
+      MaterializedField outputField = vvOut.getField();
+
+      ErrorCollector collector = new ErrorCollectorImpl();
+      // According to input data names, MinorTypes, DataModes, choose to
       // transfer directly,
       // rename columns or
-      // cast data types (Minortype or DataMode)
-      if (container.getSchema().getColumn(index).hasSameTypeAndMode(vvIn.getField())
-          && vvIn.getField().getType().getMinorType() != TypeProtos.MinorType.MAP // Per DRILL-5521, existing bug for map transfer
-          ) {
+      // cast data types (MinorType or DataMode)
+      if (areAssignableTypes(inField.getType(), outputField.getType())) {
         // Transfer column
         TransferPair tp = vvIn.makeTransferPair(vvOut);
         transfers.add(tp);
-      } else if (vvIn.getField().getType().getMinorType() == TypeProtos.MinorType.NULL) {
+      } else if (inField.getType().getMinorType() == TypeProtos.MinorType.NULL) {
         continue;
       } else { // Copy data in order to rename the column
-        SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getName());
-        MaterializedField inField = vvIn.getField();
-        MaterializedField outputField = vvOut.getField();
+        SchemaPath inputPath = SchemaPath.getSimplePath(inField.getName());
 
         LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, inputBatch, collector, context.getFunctionRegistry());
         collector.reportErrors(logger);
 
         // If the inputs' DataMode is required and the outputs' DataMode is not required
         // cast to the one with the least restriction
-        if(inField.getType().getMode() == TypeProtos.DataMode.REQUIRED
+        if (inField.getType().getMode() == TypeProtos.DataMode.REQUIRED
             && outputField.getType().getMode() != TypeProtos.DataMode.REQUIRED) {
           expr = ExpressionTreeMaterializer.convertToNullableType(expr, inField.getType().getMinorType(), context.getFunctionRegistry(), collector);
           collector.reportErrors(logger);
         }
 
-        // If two inputs' MinorTypes are different,
-        // Insert a cast before the Union operation
-        if(inField.getType().getMinorType() != outputField.getType().getMinorType()) {
+        // If two inputs' MinorTypes are different or types are decimal with different scales,
+        // inserts a cast before the Union operation
+        if (isCastRequired(inField.getType(), outputField.getType())) {
           expr = ExpressionTreeMaterializer.addCastExpression(expr, outputField.getType(), context.getFunctionRegistry(), collector);
           collector.reportErrors(logger);
         }
@@ -239,6 +239,38 @@
     }
   }
 
+  /**
+   * Checks whether cast should be added for transitioning values from the one type to another one.
+   * {@code true} will be returned if minor types differ or scales differ for the decimal data type.
+   *
+   * @param first  first type
+   * @param second second type
+   * @return {@code true} if cast should be added, {@code false} otherwise
+   */
+  private boolean isCastRequired(MajorType first, MajorType second) {
+    return first.getMinorType() != second.getMinorType()
+        || (Types.areDecimalTypes(second.getMinorType(), first.getMinorType())
+            && second.getScale() != first.getScale());
+  }
+
+  /**
+   * Checks whether data may be transitioned between specified types without using casts.
+   * {@code true} will be returned if minor types and data modes are the same for non-decimal data types,
+   * or if minor types, data modes and scales are the same for decimal data types.
+   *
+   * @param first  first type
+   * @param second second type
+   * @return {@code true} if data may be transitioned between specified types without using casts,
+   * {@code false} otherwise
+   */
+  private boolean areAssignableTypes(MajorType first, MajorType second) {
+    boolean areDecimalTypes = Types.areDecimalTypes(first.getMinorType(), second.getMinorType());
+
+    return Types.isSameTypeAndMode(first, second)
+        && second.getMinorType() != TypeProtos.MinorType.MAP // Per DRILL-5521, existing bug for map transfer
+        && (!areDecimalTypes || first.getScale() == second.getScale()); // scale should match for decimal data types
+  }
+
   // The output table's column names always follow the left table,
   // where the output type is chosen based on DRILL's implicit casting rules
   private void inferOutputFieldsBothSide(final BatchSchema leftSchema, final BatchSchema rightSchema) {
@@ -250,8 +282,10 @@
       MaterializedField leftField  = leftIter.next();
       MaterializedField rightField = rightIter.next();
 
-      if (leftField.hasSameTypeAndMode(rightField)) {
-        TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().setMinorType(leftField.getType().getMinorType()).setMode(leftField.getDataMode());
+      if (Types.isSameTypeAndMode(leftField.getType(), rightField.getType())) {
+        MajorType.Builder builder = MajorType.newBuilder()
+            .setMinorType(leftField.getType().getMinorType())
+            .setMode(leftField.getDataMode());
         builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(), builder);
         container.addOrGet(MaterializedField.create(leftField.getName(), builder.build()), callBack);
       } else if (Types.isUntypedNull(rightField.getType())) {
@@ -261,7 +295,7 @@
       } else {
         // If the output type is not the same,
         // cast the column of one of the table to a data type which is the Least Restrictive
-        TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder();
+        MajorType.Builder builder = MajorType.newBuilder();
         if (leftField.getType().getMinorType() == rightField.getType().getMinorType()) {
           builder.setMinorType(leftField.getType().getMinorType());
           builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(), builder);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java
index 072fa98..1afd3b3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestVarlenDecimal.java
@@ -18,11 +18,18 @@
 package org.apache.drill.exec.store.parquet;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.categories.ParquetTest;
 import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -32,18 +39,25 @@
 
 import java.math.BigDecimal;
 import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
 
 @Category({ParquetTest.class, UnlikelyTest.class})
-public class TestVarlenDecimal extends BaseTestQuery {
+public class TestVarlenDecimal extends ClusterTest {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+  }
 
   @BeforeClass
   public static void enableDecimalDataType() {
-    setSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
+    client.alterSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
   }
 
   @AfterClass
   public static void disableDecimalDataType() {
-    resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+    client.resetSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
   }
 
   private static final String DATAFILE = "cp.`parquet/varlenDecimal.parquet`";
@@ -89,11 +103,11 @@
 
     String tableName = "jsonWithDecimals";
     try {
-      alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "json");
+      client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "json");
 
       String bigDecimalValue = "987654321987654321987654321.987654321";
 
-      test(
+      run(
           "create table dfs.tmp.%s as\n" +
               "select cast('%s' as decimal(36, 9)) dec36", tableName, bigDecimalValue);
 
@@ -110,8 +124,8 @@
           .baselineValues(new BigDecimal(bigDecimalValue).doubleValue())
           .go();
     } finally {
-      resetSessionOption(ExecConstants.OUTPUT_FORMAT_OPTION);
-      test("drop table if exists dfs.tmp.%s", tableName);
+      client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION);
+      run("drop table if exists dfs.tmp.%s", tableName);
     }
   }
 
@@ -119,11 +133,11 @@
   public void testWriteReadCsv() throws Exception {
     String tableName = "csvWithDecimals";
     try {
-      alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csvh");
+      client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csvh");
 
       String bigDecimalValue = "987654321987654321987654321.987654321";
 
-      test(
+      run(
           "create table dfs.tmp.%s as\n" +
               "select cast('%s' as decimal(36, 9)) dec36", tableName, bigDecimalValue);
 
@@ -139,8 +153,41 @@
           .baselineValues(new BigDecimal(bigDecimalValue))
           .go();
     } finally {
-      resetSessionOption(ExecConstants.OUTPUT_FORMAT_OPTION);
-      test("drop table if exists dfs.tmp.%s", tableName);
+      client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION);
+      run("drop table if exists dfs.tmp.%s", tableName);
+    }
+  }
+
+  @Test
+  public void testUnionAllWithDifferentScales() throws Exception {
+    try {
+      run("create table dfs.tmp.t as select cast(999999999999999 as decimal(15,0)) as d");
+
+      String query = "select cast(1000 as decimal(10,1)) as d\n" +
+          "union all \n" +
+          "select 596.000 as d \n" +
+          "union all \n" +
+          "select d from dfs.tmp.t";
+
+      testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("d")
+          .baselineValues(new BigDecimal("1000.000"))
+          .baselineValues(new BigDecimal("596.000"))
+          .baselineValues(new BigDecimal("999999999999999.000"))
+          .go();
+
+      List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Collections.singletonList(Pair.of(
+          SchemaPath.getSimplePath("d"),
+          Types.withPrecisionAndScale(MinorType.VARDECIMAL, DataMode.REQUIRED, 18, 3)));
+
+      testBuilder()
+          .sqlQuery(query)
+          .schemaBaseLine(expectedSchema)
+          .go();
+    } finally {
+      run("drop table if exists dfs.tmp.t");
     }
   }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index bb734de..4e7b298 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -451,16 +451,6 @@
     return toString(true);
   }
 
-  /**
-   * Return true if two fields have identical MinorType and Mode.
-   * @param that
-   * @return
-   */
-  public boolean hasSameTypeAndMode(MaterializedField that) {
-    return (getType().getMinorType() == that.getType().getMinorType())
-        && (getType().getMode() == that.getType().getMode());
-  }
-
   private String toString(Collection<?> collection, int maxLen) {
     final StringBuilder builder = new StringBuilder();
     int i = 0;