DRILL-8408: Allow Implicit Casts on Join (#2772)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index e83615e..b341fb6 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -28,7 +28,7 @@
jobs:
build:
name: Main Build
- runs-on: ubuntu-latest
+ runs-on: ubuntu-20.04
timeout-minutes: 150
strategy:
matrix:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 73aac06..9b32005 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -191,6 +191,11 @@
public static final BooleanValidator HASHAGG_FALLBACK_ENABLED_VALIDATOR = new BooleanValidator(HASHAGG_FALLBACK_ENABLED_KEY,
new OptionDescription("Hash Aggregates ignore memory limits when enabled (true). When disabled (false), Hash Aggregates fail when memory is set too low."));
+ public static final String IMPLICIT_CAST_FOR_JOINS_ENABLED = "drill.exec.implicit_casts.joins.enabled";
+ public static final BooleanValidator IMPLICIT_CAST_FOR_JOINS_ENABLED_VALIDATOR = new BooleanValidator(IMPLICIT_CAST_FOR_JOINS_ENABLED,
+ new OptionDescription("When true, this option enables implicit casts for joins. This is an experimental feature in Drill 1.21.1"));
+
+
// Partitioner options
public static final String PARTITIONER_MEMORY_REDUCTION_THRESHOLD_KEY = "exec.partition.mem_throttle";
public static final LongValidator PARTITIONER_MEMORY_REDUCTION_THRESHOLD_VALIDATOR =
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
index 5cb279d..d6b7fff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
@@ -38,7 +38,9 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -52,6 +54,8 @@
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.resolver.TypeCastRules;
import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.LinkedList;
@@ -66,7 +70,7 @@
INEQUALITY, // inequality join: <>, <, >
CARTESIAN // no join condition
}
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinUtils.class);
+ private static final Logger logger = LoggerFactory.getLogger(JoinUtils.class);
public static final String FAILED_TO_PLAN_CARTESIAN_JOIN = String.format(
"This query cannot be planned possibly due to either a cartesian join or an inequality join. %n" +
@@ -148,36 +152,37 @@
}
/**
- * Checks if implicit cast is allowed between the two input types of the join condition. Currently we allow
+ * Checks if implicit cast is allowed between the two input types of the join condition. Currently, we allow
* implicit casts in join condition only between numeric types and varchar/varbinary types.
- * @param input1
- * @param input2
+ * @param input1 The {@link MinorType} of the left side of the join.
+ * @param input2 The {@link MinorType} of the right side of the join.
* @return true if implicit cast is allowed false otherwise
*/
- private static boolean allowImplicitCast(TypeProtos.MinorType input1, TypeProtos.MinorType input2) {
+ private static boolean allowImplicitCast(MinorType input1, MinorType input2) {
// allow implicit cast if both the input types are numeric and any of them is non-decimal
// or both of them are decimal
if (TypeCastRules.isNumericType(input1) && TypeCastRules.isNumericType(input2)
&& ((!Types.isDecimalType(input1) && !Types.isDecimalType(input2))
- || Types.areDecimalTypes(input1, input2))) {
+ || Types.areDecimalTypes(input1, input2))) {
return true;
}
// allow implicit cast if input types are date/ timestamp
- if ((input1 == TypeProtos.MinorType.DATE || input1 == TypeProtos.MinorType.TIMESTAMP) &&
- (input2 == TypeProtos.MinorType.DATE || input2 == TypeProtos.MinorType.TIMESTAMP)) {
+ if ((input1 == MinorType.DATE || input1 == MinorType.TIMESTAMP) &&
+ (input2 == MinorType.DATE || input2 == MinorType.TIMESTAMP)) {
return true;
}
// allow implicit cast if both the input types are varbinary/ varchar
- if ((input1 == TypeProtos.MinorType.VARCHAR || input1 == TypeProtos.MinorType.VARBINARY) &&
- (input2 == TypeProtos.MinorType.VARCHAR || input2 == TypeProtos.MinorType.VARBINARY)) {
+ if ((input1 == MinorType.VARCHAR || input1 == MinorType.VARBINARY) &&
+ (input2 == MinorType.VARCHAR || input2 == MinorType.VARBINARY)) {
return true;
}
return false;
}
+
/**
* Utility method used by joins to add implicit casts on one of the sides of the join condition in case the two
* expressions have different types.
@@ -203,16 +208,20 @@
}
if (rightType != leftType) {
- // currently we only support implicit casts if the input types are numeric or varchar/varbinary
- if (!allowImplicitCast(rightType, leftType)) {
- throw new DrillRuntimeException(String.format("Join only supports implicit casts between\n" +
- "1. Numeric data (none of types is decimal or both of them are decimal)\n" +
- "2. Varchar, Varbinary data\n3. Date, Timestamp data\n" +
- "Left type: %s, Right type: %s. Add explicit casts to avoid this error", leftType, rightType));
+ boolean implicitCasts = context.getOptions().getBoolean(ExecConstants.IMPLICIT_CAST_FOR_JOINS_ENABLED);
+
+ if (!implicitCasts) {
+ // If implicit casts are disallowed, revert to previous Drill behavior.
+ if (!allowImplicitCast(rightType, leftType)) {
+ throw new DrillRuntimeException(String.format("Join only supports implicit casts between\n" +
+ "1. Numeric data (none of types is decimal or both of them are decimal)\n" +
+ "2. Varchar, Varbinary data\n3. Date, Timestamp data\n" +
+ "Left type: %s, Right type: %s. Add explicit casts to avoid this error", leftType, rightType));
+ }
}
// We need to add a cast to one of the expressions
- TypeProtos.MinorType result = TypeCastRules.getLeastRestrictiveType(leftType, rightType);
+ MinorType result = TypeCastRules.getLeastRestrictiveType(leftType, rightType);
ErrorCollector errorCollector = new ErrorCollectorImpl();
if (result == null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index c409053..659b3d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -251,6 +251,7 @@
new OptionDefinition(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.HASHJOIN_FRAGMENTATION_FACTOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.HASH_AGG_TABLE_FACTOR),
+ new OptionDefinition(ExecConstants.IMPLICIT_CAST_FOR_JOINS_ENABLED_VALIDATOR),
new OptionDefinition(ExecConstants.AVERAGE_FIELD_WIDTH),
new OptionDefinition(ExecConstants.NEW_VIEW_DEFAULT_PERMS_VALIDATOR),
new OptionDefinition(ExecConstants.CTAS_PARTITIONING_HASH_DISTRIBUTE_VALIDATOR),
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 981e519..749bd11 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -552,6 +552,7 @@
drill.exec.storage.implicit.row_group_length.column.label: "rgl",
drill.exec.storage.implicit.last_modified_time.column.label: "lmt",
drill.exec.storage.implicit.project_metadata.column.label: "$project_metadata$",
+ drill.exec.implicit_casts.joins.enabled: false,
drill.exec.testing.controls: "{}",
drill.exec.memory.operator.output_batch_size : 16777216, # 16 MB
drill.exec.memory.operator.output_batch_size_avail_mem_factor : 0.1,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSetOp.java b/exec/java-exec/src/test/java/org/apache/drill/TestSetOp.java
index 98d3260..bb1d10f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestSetOp.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSetOp.java
@@ -17,22 +17,27 @@
*/
package org.apache.drill;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchemaBuilder;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.util.StoragePluginTestUtils;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.categories.SqlTest;
import org.apache.drill.categories.UnlikelyTest;
-import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetComparison;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -45,6 +50,10 @@
import java.nio.file.Paths;
import java.util.List;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
@Category({SqlTest.class, OperatorTest.class})
public class TestSetOp extends ClusterTest {
private static final String EMPTY_DIR_NAME = "empty_directory";
@@ -439,15 +448,54 @@
.baselineValues(20L, 3L, 5L, 5L)
.build().run();
}
-
- @Test(expected = UserException.class)
- public void testImplicitCastingFailure() throws Exception {
+ @Test
+ public void testImplicitCastingOnJoin() throws Exception {
+ client.alterSession(ExecConstants.IMPLICIT_CAST_FOR_JOINS_ENABLED, true);
String rootInt = "/store/json/intData.json";
String rootBoolean = "/store/json/booleanData.json";
+ String stringsAsInts = "/store/json/intDataAsString.json";
- run("(select key from cp.`%s` " +
- "intersect all " +
- "select key from cp.`%s` )", rootInt, rootBoolean);
+ RowSet result = client.queryBuilder()
+ .sql("(select key from cp.`%s` " +
+ "intersect all " +
+ "select key from cp.`%s` )", rootInt, rootBoolean)
+ .rowSet();
+
+ assertEquals(0, result.rowCount());
+ result.clear();
+
+ result = client.queryBuilder()
+ .sql("(select key from cp.`%s` " +
+ "intersect all " +
+ "select key from cp.`%s` )", rootInt, stringsAsInts)
+ .rowSet();
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("key", MinorType.BIGINT)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(52459253098448904L)
+ .addRow(1116675951L)
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(result);
+ }
+
+ @Test
+ public void testImplicitCastingOnJoinDisabled() throws Exception {
+ String rootInt = "/store/json/intData.json";
+ String stringsAsInts = "/store/json/intDataAsString.json";
+
+ try {
+ client.queryBuilder()
+ .sql("(select key from cp.`%s` " +
+ "intersect all " +
+ "select key from cp.`%s` )", rootInt, stringsAsInts)
+ .run();
+ fail();
+ } catch (UserException e) {
+ assertTrue(e.getMessage().contains("Join only supports implicit casts"));
+ }
}
@Test
diff --git a/exec/java-exec/src/test/resources/store/json/intDataAsString.json b/exec/java-exec/src/test/resources/store/json/intDataAsString.json
new file mode 100644
index 0000000..98f8421
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/json/intDataAsString.json
@@ -0,0 +1,3 @@
+{"key":"52459253098448904"}
+{"key":"1116675951"}
+{"key": "1"}
\ No newline at end of file