IGNITE-21720 Sql. Implement hash join (#3608)
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlJoinBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlJoinBenchmark.java
new file mode 100644
index 0000000..3cc3ee2
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlJoinBenchmark.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.sql.engine.util.Commons.IN_BUFFER_SIZE;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.sql.IgniteSql;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark that runs join sql queries via embedded client on clusters of different size.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(1)
+@Warmup(iterations = 10, time = 2)
+@Measurement(iterations = 20, time = 2)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@SuppressWarnings({"WeakerAccess", "unused"})
+public class SqlJoinBenchmark extends AbstractMultiNodeBenchmark {
+ private static final int TABLE_SIZE = 2 * IN_BUFFER_SIZE + 1;
+
+ private IgniteSql sql;
+
+ @Param({"1", "2"})
+ private int clusterSize;
+
+ /** Fills the table with data. */
+ @Setup
+ public void setUp() throws IOException {
+ populateTable(TABLE_NAME, TABLE_SIZE, 1_000);
+
+ sql = clusterNode.sql();
+ }
+
+ /**
+ * Benchmark left hash join.
+ */
+ @Benchmark
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public void leftHashJoin(Blackhole bh) {
+ try (var rs = sql.execute(null, ""
+ + "SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter', 'MergeJoinConverter', 'CorrelatedNestedLoopJoin') */ t1.field1 "
+ + "FROM usertable t1 "
+ + "LEFT JOIN usertable t2 "
+ + "on t1.field2 = t2.field2")) {
+ while (rs.hasNext()) {
+ bh.consume(rs.next());
+ }
+ }
+ }
+
+ /**
+ * Benchmark left merge join.
+ */
+ @Benchmark
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public void leftMergeJoin(Blackhole bh) {
+ try (var rs = sql.execute(null, ""
+ + "SELECT /*+ DISABLE_RULE('HashJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ t1.field1 "
+ + "FROM usertable t1 "
+ + "LEFT JOIN usertable t2 "
+ + "on t1.field2 = t2.field2")) {
+ while (rs.hasNext()) {
+ bh.consume(rs.next());
+ }
+ }
+ }
+
+ /**
+ * Benchmark left nl join.
+ */
+ @Benchmark
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public void leftNestedJoin(Blackhole bh) {
+ try (var rs = sql.execute(null, ""
+ + "SELECT /*+ DISABLE_RULE('HashJoinConverter', 'MergeJoinConverter', 'CorrelatedNestedLoopJoin') */ t1.field1 "
+ + "FROM usertable t1 "
+ + "LEFT JOIN usertable t2 "
+ + "on t1.field2 = t2.field2")) {
+ while (rs.hasNext()) {
+ bh.consume(rs.next());
+ }
+ }
+ }
+
+ /**
+ * Benchmark's entry point.
+ */
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(".*" + SqlJoinBenchmark.class.getSimpleName() + ".*Join")
+ .build();
+
+ new Runner(opt).run();
+ }
+
+ @Override
+ protected int nodes() {
+ return clusterSize;
+ }
+}
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
index 86cbb06..4d5bf44 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
@@ -324,6 +324,7 @@
+ "AS agg ON t2_colo_va1.val1 = agg.val1";
assertQuery(sql)
+ .disableRules("HashJoinConverter", "MergeJoinConverter")
.matches(QueryChecker.matches(".*Exchange.*Join.*Colocated.*Aggregate.*"))
.returns("val0", 50L)
.returns("val1", 50L)
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
index 99a804b..ac504d1 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
@@ -25,7 +25,8 @@
/** Tests for correlated queries. */
public class ItCorrelatesTest extends BaseSqlIntegrationTest {
- private static final String DISABLED_JOIN_RULES = " /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter') */ ";
+ private static final String DISABLED_JOIN_RULES = " /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter', "
+ + "'HashJoinConverter') */ ";
@AfterEach
public void dropTables() {
@@ -49,7 +50,7 @@
* Tests resolving of collisions in correlates with correlate variables in the left hand.
*/
@Test
- public void testCorrelatesCollisionLeft() throws InterruptedException {
+ public void testCorrelatesCollisionLeft() {
sql("CREATE TABLE test1 (a INTEGER PRIMARY KEY, b INTEGER)");
sql("CREATE TABLE test2 (a INTEGER PRIMARY KEY, c INTEGER)");
@@ -68,7 +69,7 @@
* Tests resolving of collisions in correlates with correlate variables in both, left and right hands.
*/
@Test
- public void testCorrelatesCollisionRight() throws InterruptedException {
+ public void testCorrelatesCollisionRight() {
sql("CREATE TABLE test1 (a INTEGER PRIMARY KEY, b INTEGER)");
sql("CREATE TABLE test2 (a INTEGER PRIMARY KEY, c INTEGER)");
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItHashSpoolIntegrationTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItHashSpoolIntegrationTest.java
index 939845a..c7ddb8e 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItHashSpoolIntegrationTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItHashSpoolIntegrationTest.java
@@ -57,7 +57,8 @@
+ "FROM t0 JOIN t1 ON t0.i1=t1.i1 AND t0.i2=t1.i2";
assertQuery(sql)
- .disableRules("MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToSortedIndexSpoolRule")
+ .disableRules("HashJoinConverter", "MergeJoinConverter", "NestedLoopJoinConverter",
+ "FilterSpoolMergeToSortedIndexSpoolRule")
.returns(1, 1, 1, 1)
.check();
}
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
index 2ca44e8..7c8a206 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
@@ -80,7 +80,7 @@
public void test(int rows, int partitions) throws InterruptedException {
prepareDataSet(rows, partitions);
- var res = sql("SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter', 'MergeJoinConverter') */"
+ var res = sql("SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter', 'MergeJoinConverter', 'HashJoinConverter') */"
+ "T0.val, T1.val FROM TEST0 as T0 "
+ "JOIN TEST1 as T1 on T0.jid = T1.jid "
);
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java
index 600f60f..191ef5b 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java
@@ -38,6 +38,9 @@
public static void beforeTestsStarted() {
sql("CREATE TABLE t1 (id INT PRIMARY KEY, c1 INT NOT NULL, c2 INT, c3 INT)");
sql("CREATE TABLE t2 (id INT PRIMARY KEY, c1 INT NOT NULL, c2 INT, c3 INT)");
+ sql("CREATE TABLE t3 (id INT PRIMARY KEY, c1 INTEGER)");
+ sql("CREATE TABLE checkNulls1 (id INT PRIMARY KEY, c1 INTEGER, c2 INTEGER)");
+ sql("CREATE TABLE checkNulls2 (id INT PRIMARY KEY, c1 INTEGER, c2 INTEGER)");
sql("create index t1_idx on t1 (c3, c2, c1)");
sql("create index t2_idx on t2 (c3, c2, c1)");
@@ -59,6 +62,208 @@
new Object[] {4, 3, 3, 3},
new Object[] {5, 4, 4, 4}
);
+
+ insertData("t3", List.of("ID", "C1"),
+ new Object[] {0, 1},
+ new Object[] {1, 2},
+ new Object[] {2, 3},
+ new Object[] {3, null},
+ new Object[] {7, 7},
+ new Object[] {8, 8}
+ );
+
+ insertData("checkNulls1", List.of("ID", "C1", "C2"),
+ new Object[] {0, null, 1}
+ );
+
+ insertData("checkNulls2", List.of("ID", "C1", "C2"),
+ new Object[] {0, 1, null}
+ );
+ }
+
+ @ParameterizedTest
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
+ @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+ public void testCheckNullsNullFirstLeftJoin(JoinType joinType) {
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls1 t1 left join checkNulls1 t2 on t1.c1 = t2.c1;",
+ joinType
+ )
+ .returns(null, 1, null, null)
+ .check();
+
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls1 t1 left join checkNulls1 t2 on t1.c1 = t2.c1 and t1.c2 = t2.c2;",
+ joinType
+ )
+ .returns(null, 1, null, null)
+ .check();
+
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls1 t1 left join checkNulls1 t2 on t1.c2 = t2.c2;",
+ joinType
+ )
+ .returns(null, 1, null, 1)
+ .check();
+ }
+
+ @ParameterizedTest
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
+ @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+ public void testCheckNullsNullFirstRightJoin(JoinType joinType) {
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls1 t1 right join checkNulls1 t2 on t1.c1 = t2.c1;",
+ joinType
+ )
+ .returns(null, null, null, 1)
+ .check();
+
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls1 t1 right join checkNulls1 t2 on t1.c1 = t2.c1 and t1.c2 = t2.c2;",
+ joinType
+ )
+ .returns(null, null, null, 1)
+ .check();
+
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls1 t1 right join checkNulls1 t2 on t1.c2 = t2.c2;",
+ joinType
+ )
+ .returns(null, 1, null, 1)
+ .check();
+ }
+
+ @ParameterizedTest
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
+ @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+ public void testCheckNullsNullFirstInnerJoin(JoinType joinType) {
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls1 t1 inner join checkNulls1 t2 on t1.c1 = t2.c1;",
+ joinType
+ )
+ .returnNothing()
+ .check();
+
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls1 t1 inner join checkNulls1 t2 on t1.c1 = t2.c1 and t1.c2 = t2.c2;",
+ joinType
+ )
+ .returnNothing()
+ .check();
+
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls1 t1 inner join checkNulls1 t2 on t1.c2 = t2.c2;",
+ joinType
+ )
+ .returns(null, 1, null, 1)
+ .check();
+ }
+
+ @ParameterizedTest
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
+ @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+ public void testCheckNullsNullSecondLeftJoin(JoinType joinType) {
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls2 t1 left join checkNulls2 t2 on t1.c1 = t2.c1;",
+ joinType
+ )
+ .returns(1, null, 1, null)
+ .check();
+
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls2 t1 left join checkNulls2 t2 on t1.c1 = t2.c1 and t1.c2 = t2.c2;",
+ joinType
+ )
+ .returns(1, null, null, null)
+ .check();
+
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls2 t1 left join checkNulls2 t2 on t1.c2 = t2.c2;",
+ joinType
+ )
+ .returns(1, null, null, null)
+ .check();
+ }
+
+ @ParameterizedTest
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
+ @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+ public void testCheckNullsNullSecondRightJoin(JoinType joinType) {
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls2 t1 right join checkNulls2 t2 on t1.c1 = t2.c1;",
+ joinType
+ )
+ .returns(1, null, 1, null)
+ .check();
+
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls2 t1 right join checkNulls2 t2 on t1.c1 = t2.c1 and t1.c2 = t2.c2;",
+ joinType
+ )
+ .returns(null, null, 1, null)
+ .check();
+
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls2 t1 right join checkNulls2 t2 on t1.c2 = t2.c2;",
+ joinType
+ )
+ .returns(null, null, 1, null)
+ .check();
+ }
+
+ @ParameterizedTest
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
+ @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+ public void testCheckNullsNullSecondInnerJoin(JoinType joinType) {
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls2 t1 inner join checkNulls2 t2 on t1.c1 = t2.c1;",
+ joinType
+ )
+ .returns(1, null, 1, null)
+ .check();
+
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls2 t1 inner join checkNulls2 t2 on t1.c1 = t2.c1 and t1.c2 = t2.c2;",
+ joinType
+ )
+ .returnNothing()
+ .check();
+
+ assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+ + "from checkNulls2 t1 inner join checkNulls2 t2 on t1.c2 = t2.c2;",
+ joinType
+ )
+ .returnNothing()
+ .check();
+ }
+
+ @ParameterizedTest
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
+ @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+ public void testFullOuterJoin(JoinType joinType) {
+ assertQuery(""
+ + "select t1.c1 c11, t1.c2 c12, t1.c3 c13, t3.id c21, t3.c1 c22 "
+ + " from t1 "
+ + " full outer join t3 "
+ + " on t1.c1 = t3.id "
+ + " and t1.c2 = t3.c1 "
+ + " order by t1.c1, t1.c2, t1.c3, t3.id",
+ joinType
+ )
+ .ordered()
+ .returns(1, 1, 1, null, null)
+ .returns(2, 2, 2, null, null)
+ .returns(2, null, 2, null, null)
+ .returns(3, 3, 3, null, null)
+ .returns(3, 3, null, null, null)
+ .returns(4, 4, 4, null, null)
+ .returns(null, null, null, 0, 1)
+ .returns(null, null, null, 1, 2)
+ .returns(null, null, null, 2, 3)
+ .returns(null, null, null, 3, null)
+ .returns(null, null, null, 7, 7)
+ .returns(null, null, null, 8, 8)
+ .check();
}
/**
@@ -68,6 +273,15 @@
// TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
@EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
public void testInnerJoin(JoinType joinType) {
+ assertQuery("SELECT c1 FROM t3 WHERE c1 = ANY(SELECT c1 FROM t3) ORDER BY c1", joinType)
+ .ordered()
+ .returns(1)
+ .returns(2)
+ .returns(3)
+ .returns(7)
+ .returns(8)
+ .check();
+
assertQuery(""
+ "select t1.c1 c11, t1.c2 c12, t1.c3 c13, t2.c1 c21, t2.c2 c22 "
+ " from t1 "
@@ -250,13 +464,23 @@
*/
@ParameterizedTest
// TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
- @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+ @EnumSource(mode = Mode.EXCLUDE, names = {"CORRELATED"})
public void testLeftJoin(JoinType joinType) {
+ assertQuery("select t31.c1 from t3 t31 left join t3 t32 on t31.c1 = t32.c1 ORDER BY t31.c1;", joinType)
+ .ordered()
+ .returns(1)
+ .returns(2)
+ .returns(3)
+ .returns(7)
+ .returns(8)
+ .returns(null)
+ .check();
+
assertQuery(""
+ "select t1.c1 c11, t1.c2 c12, t1.c3 c13, t2.c1 c21, t2.c2 c22 "
+ " from t1 "
+ " left join t2 "
- + " on t1.c1 = t2.c1 "
+ + " on t1.c1 = t2.c1"
+ " and t1.c2 = t2.c2 "
+ " order by t1.c1, t1.c2, t1.c3",
joinType
@@ -843,6 +1067,8 @@
Stream<Arguments> types = Arrays.stream(JoinType.values())
// TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove filter below
.filter(type -> type != JoinType.CORRELATED)
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22074 hash join to make a deal with "is not distinct" expression
+ .filter(type -> type != JoinType.HASHJOIN)
.flatMap(v -> Stream.of(Arguments.of(v, false), Arguments.of(v, true)));
return types;
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
index 41ce7ce..cbbe218 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
@@ -126,7 +126,7 @@
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-21286")
public void testIndexLoopJoin() {
- assertQuery("SELECT /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter') */ d1.name, d2.name "
+ assertQuery("SELECT /*+ DISABLE_RULE('HashJoinConverter', 'MergeJoinConverter', 'NestedLoopJoinConverter') */ d1.name, d2.name "
+ "FROM Developer d1, Developer d2 WHERE d1.id = d2.id")
.matches(containsSubPlan("CorrelatedNestedLoopJoin"))
.returns("Bach", "Bach")
@@ -916,7 +916,7 @@
String sql = "SELECT t1.i1, t2.i1 FROM t t1 LEFT JOIN t t2 ON t1.i2 = t2.i1";
assertQuery(sql)
- .disableRules("NestedLoopJoinConverter", "MergeJoinConverter")
+ .disableRules("NestedLoopJoinConverter", "MergeJoinConverter", "HashJoinConverter")
.matches(containsSubPlan("CorrelatedNestedLoopJoin"))
.matches(containsIndexScan("PUBLIC", "T", "T_IDX"))
.returns(0, null)
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 5ebeedd..4a09b90 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -57,6 +57,7 @@
import org.apache.ignite.internal.sql.engine.exec.rel.DataSourceScanNode;
import org.apache.ignite.internal.sql.engine.exec.rel.FilterNode;
import org.apache.ignite.internal.sql.engine.exec.rel.HashAggregateNode;
+import org.apache.ignite.internal.sql.engine.exec.rel.HashJoinNode;
import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
import org.apache.ignite.internal.sql.engine.exec.rel.IndexScanNode;
import org.apache.ignite.internal.sql.engine.exec.rel.IndexSpoolNode;
@@ -81,6 +82,7 @@
import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
+import org.apache.ignite.internal.sql.engine.rel.IgniteHashJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
@@ -263,6 +265,24 @@
/** {@inheritDoc} */
@Override
+ public Node<RowT> visit(IgniteHashJoin rel) {
+ RelDataType outType = rel.getRowType();
+ RelDataType leftType = rel.getLeft().getRowType();
+ RelDataType rightType = rel.getRight().getRowType();
+ JoinRelType joinType = rel.getJoinType();
+
+ Node<RowT> node = HashJoinNode.create(ctx, outType, leftType, rightType, joinType, rel.analyzeCondition());
+
+ Node<RowT> leftInput = visit(rel.getLeft());
+ Node<RowT> rightInput = visit(rel.getRight());
+
+ node.register(asList(leftInput, rightInput));
+
+ return node;
+ }
+
+ /** {@inheritDoc} */
+ @Override
public Node<RowT> visit(IgniteCorrelatedNestedLoopJoin rel) {
RelDataType leftType = rel.getLeft().getRowType();
RelDataType rightType = rel.getRight().getRowType();
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
index 520fc69..d466d98 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
@@ -36,6 +36,7 @@
import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
+import org.apache.ignite.internal.sql.engine.rel.IgniteHashJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
@@ -154,6 +155,11 @@
}
@Override
+ public Mapping visit(IgniteHashJoin rel) {
+ return mapBiRel(rel);
+ }
+
+ @Override
public Mapping visit(IgniteCorrelatedNestedLoopJoin rel) {
return mapBiRel(rel);
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java
new file mode 100644
index 0000000..194391e
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.jetbrains.annotations.Nullable;
+
+/** Right part materialized join node, i.e. all data from right part of join is available locally. */
+public abstract class AbstractRightMaterializedJoinNode<RowT> extends AbstractNode<RowT> {
+ /** Special value to highlights that all row were received and we are not waiting any more. */
+ static final int NOT_WAITING = -1;
+
+ protected boolean inLoop;
+ protected int requested;
+ int waitingLeft;
+ int waitingRight;
+ final Deque<RowT> leftInBuf = new ArrayDeque<>(inBufSize);
+ protected @Nullable RowT left;
+
+ AbstractRightMaterializedJoinNode(ExecutionContext<RowT> ctx) {
+ super(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void request(int rowsCnt) throws Exception {
+ assert !nullOrEmpty(sources()) && sources().size() == 2;
+ assert rowsCnt > 0 && requested == 0;
+
+ checkState();
+
+ requested = rowsCnt;
+
+ if (!inLoop) {
+ context().execute(this::doJoin, this::onError);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void rewindInternal() {
+ requested = 0;
+ waitingLeft = 0;
+ waitingRight = 0;
+ left = null;
+
+ leftInBuf.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected Downstream<RowT> requestDownstream(int idx) {
+ if (idx == 0) {
+ return new Downstream<>() {
+ /** {@inheritDoc} */
+ @Override
+ public void push(RowT row) throws Exception {
+ pushLeft(row);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void end() throws Exception {
+ endLeft();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onError(Throwable e) {
+ AbstractRightMaterializedJoinNode.this.onError(e);
+ }
+ };
+ } else if (idx == 1) {
+ return new Downstream<>() {
+ /** {@inheritDoc} */
+ @Override
+ public void push(RowT row) throws Exception {
+ pushRight(row);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void end() throws Exception {
+ endRight();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onError(Throwable e) {
+ AbstractRightMaterializedJoinNode.this.onError(e);
+ }
+ };
+ }
+
+ throw new IndexOutOfBoundsException();
+ }
+
+ private void pushLeft(RowT row) throws Exception {
+ assert downstream() != null;
+ assert waitingLeft > 0;
+
+ checkState();
+
+ waitingLeft--;
+
+ leftInBuf.add(row);
+
+ join();
+ }
+
+ private void endLeft() throws Exception {
+ assert downstream() != null;
+ assert waitingLeft > 0;
+
+ checkState();
+
+ waitingLeft = NOT_WAITING;
+
+ join();
+ }
+
+ private void endRight() throws Exception {
+ assert downstream() != null;
+ assert waitingRight > 0;
+
+ checkState();
+
+ waitingRight = NOT_WAITING;
+
+ join();
+ }
+
+ Node<RowT> leftSource() {
+ return sources().get(0);
+ }
+
+ Node<RowT> rightSource() {
+ return sources().get(1);
+ }
+
+ private void doJoin() throws Exception {
+ checkState();
+
+ join();
+ }
+
+ protected abstract void join() throws Exception;
+
+ protected abstract void pushRight(RowT row) throws Exception;
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinNode.java
new file mode 100644
index 0000000..c07258a
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinNode.java
@@ -0,0 +1,707 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.apache.ignite.internal.sql.engine.util.TypeUtils.rowSchemaFromRelTypes;
+
+import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowBuilder;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+
+/** HashJoin implementor. */
+public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNode<RowT> {
+ final Map<RowWrapper<RowT>, TouchedCollection<RowT>> hashStore = new Object2ObjectOpenHashMap<>();
+ protected final RowHandler<RowT> handler;
+
+ private final List<Integer> leftJoinPositions;
+ private final List<Integer> rightJoinPositions;
+
+ final boolean touchResults;
+
+ Iterator<RowT> rightIt = Collections.emptyIterator();
+ private final RowSchema rightJoinRelatedRowSchema;
+ private final RowSchema leftJoinRelatedRowSchema;
+
+ private final RowBuilder<RowT> leftRowBuilder;
+ private final RowBuilder<RowT> rightRowBuilder;
+
+ private HashJoinNode(ExecutionContext<RowT> ctx, JoinInfo joinInfo, boolean touch,
+ RelDataType leftRowType, RelDataType rightRowType) {
+ super(ctx);
+
+ handler = ctx.rowHandler();
+ touchResults = touch;
+
+ leftJoinPositions = joinInfo.leftKeys.toIntegerList();
+ rightJoinPositions = joinInfo.rightKeys.toIntegerList();
+
+ assert leftJoinPositions.size() == rightJoinPositions.size();
+
+ ImmutableIntList rightKeys = joinInfo.rightKeys;
+ List<RelDataType> rightTypes = new ArrayList<>(rightKeys.size());
+ List<RelDataTypeField> rightFields = rightRowType.getFieldList();
+ for (int rightPos : rightKeys) {
+ rightTypes.add(rightFields.get(rightPos).getType());
+ }
+ rightJoinRelatedRowSchema = rowSchemaFromRelTypes(rightTypes);
+
+ ImmutableIntList leftKeys = joinInfo.leftKeys;
+ List<RelDataType> leftTypes = new ArrayList<>(leftKeys.size());
+ List<RelDataTypeField> leftFields = leftRowType.getFieldList();
+ for (int leftPos : leftKeys) {
+ leftTypes.add(leftFields.get(leftPos).getType());
+ }
+ leftJoinRelatedRowSchema = rowSchemaFromRelTypes(leftTypes);
+
+ RowFactory<RowT> leftRowFactory = handler.factory(leftJoinRelatedRowSchema);
+ leftRowBuilder = leftRowFactory.rowBuilder();
+
+ RowFactory<RowT> rightRowFactory = handler.factory(rightJoinRelatedRowSchema);
+ rightRowBuilder = rightRowFactory.rowBuilder();
+ }
+
+ @Override
+ protected void rewindInternal() {
+ rightIt = Collections.emptyIterator();
+
+ hashStore.clear();
+
+ super.rewindInternal();
+ }
+
+ /** Supplied algorithm implementation. */
+ public static <RowT> HashJoinNode<RowT> create(ExecutionContext<RowT> ctx, RelDataType outputRowType,
+ RelDataType leftRowType, RelDataType rightRowType, JoinRelType joinType, JoinInfo joinInfo) {
+
+ switch (joinType) {
+ case INNER:
+ return new InnerHashJoin<>(ctx, joinInfo, leftRowType, rightRowType);
+
+ case LEFT: {
+ RowSchema rightRowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rightRowType));
+ RowHandler.RowFactory<RowT> rightRowFactory = ctx.rowHandler().factory(rightRowSchema);
+
+ return new LeftHashJoin<>(ctx, rightRowFactory, joinInfo, leftRowType, rightRowType);
+ }
+
+ case RIGHT: {
+ RowSchema leftRowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(leftRowType));
+ RowHandler.RowFactory<RowT> leftRowFactory = ctx.rowHandler().factory(leftRowSchema);
+
+ return new RightHashJoin<>(ctx, leftRowFactory, joinInfo, leftRowType, rightRowType);
+ }
+
+ case FULL: {
+ RowSchema leftRowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(leftRowType));
+ RowSchema rightRowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rightRowType));
+ RowHandler.RowFactory<RowT> leftRowFactory = ctx.rowHandler().factory(leftRowSchema);
+ RowHandler.RowFactory<RowT> rightRowFactory = ctx.rowHandler().factory(rightRowSchema);
+
+ return new FullOuterHashJoin<>(ctx, leftRowFactory, rightRowFactory, joinInfo, leftRowType, rightRowType);
+ }
+
+ case SEMI:
+ return new SemiHashJoin<>(ctx, joinInfo, leftRowType, rightRowType);
+
+ case ANTI:
+ return new AntiHashJoin<>(ctx, joinInfo, leftRowType, rightRowType);
+
+ default:
+ throw new IllegalStateException("Join type \"" + joinType + "\" is not supported yet");
+ }
+ }
+
+ private static class InnerHashJoin<RowT> extends HashJoinNode<RowT> {
+ private InnerHashJoin(
+ ExecutionContext<RowT> ctx,
+ JoinInfo joinInfo,
+ RelDataType leftRowType,
+ RelDataType rightRowType
+ ) {
+ super(ctx, joinInfo, false, leftRowType, rightRowType);
+ }
+
+ @Override
+ protected void join() throws Exception {
+ if (waitingRight == NOT_WAITING) {
+ inLoop = true;
+ try {
+ while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
+ if (!rightIt.hasNext()) {
+ left = leftInBuf.remove();
+
+ Collection<RowT> rightRows = lookup(left, touchResults);
+
+ rightIt = rightRows.iterator();
+ }
+
+ if (rightIt.hasNext()) {
+ while (rightIt.hasNext()) {
+ checkState();
+
+ RowT right = rightIt.next();
+
+ --requested;
+
+ RowT row = handler.concat(left, right);
+ downstream().push(row);
+
+ if (requested == 0) {
+ break;
+ }
+ }
+ }
+
+ if (!rightIt.hasNext()) {
+ left = null;
+ }
+ }
+ } finally {
+ inLoop = false;
+ }
+ }
+
+ getMoreOrEnd();
+ }
+ }
+
+ private static class LeftHashJoin<RowT> extends HashJoinNode<RowT> {
+ /** Right row factory. */
+ private final RowHandler.RowFactory<RowT> rightRowFactory;
+
+ private LeftHashJoin(
+ ExecutionContext<RowT> ctx,
+ RowHandler.RowFactory<RowT> rightRowFactory,
+ JoinInfo joinInfo,
+ RelDataType leftRowType,
+ RelDataType rightRowType
+ ) {
+ super(ctx, joinInfo, false, leftRowType, rightRowType);
+
+ this.rightRowFactory = rightRowFactory;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void join() throws Exception {
+ if (waitingRight == NOT_WAITING) {
+ inLoop = true;
+ try {
+ while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
+ checkState();
+
+ if (!rightIt.hasNext()) {
+ left = leftInBuf.remove();
+
+ Collection<RowT> rightRows = lookup(left, touchResults);
+
+ if (rightRows.isEmpty()) {
+ requested--;
+ downstream().push(handler.concat(left, rightRowFactory.create()));
+ }
+
+ rightIt = rightRows.iterator();
+ }
+
+ if (rightIt.hasNext()) {
+ while (rightIt.hasNext()) {
+ checkState();
+
+ RowT right = rightIt.next();
+
+ --requested;
+
+ RowT row = handler.concat(left, right);
+ downstream().push(row);
+
+ if (requested == 0) {
+ break;
+ }
+ }
+ }
+
+ if (!rightIt.hasNext()) {
+ left = null;
+ }
+ }
+ } finally {
+ inLoop = false;
+ }
+ }
+
+ getMoreOrEnd();
+ }
+ }
+
+ private static class RightHashJoin<RowT> extends HashJoinNode<RowT> {
+ /** Left row factory. */
+ private final RowHandler.RowFactory<RowT> leftRowFactory;
+
+ private RightHashJoin(
+ ExecutionContext<RowT> ctx,
+ RowHandler.RowFactory<RowT> leftRowFactory,
+ JoinInfo joinInfo,
+ RelDataType leftRowType,
+ RelDataType rightRowType
+ ) {
+ super(ctx, joinInfo, true, leftRowType, rightRowType);
+
+ this.leftRowFactory = leftRowFactory;
+ }
+
+ @Override
+ protected void join() throws Exception {
+ if (waitingRight == NOT_WAITING) {
+ inLoop = true;
+ try {
+ while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
+ checkState();
+
+ if (!rightIt.hasNext()) {
+ left = leftInBuf.remove();
+
+ Collection<RowT> rightRows = lookup(left, touchResults);
+
+ rightIt = rightRows.iterator();
+ }
+
+ if (rightIt.hasNext()) {
+ while (rightIt.hasNext()) {
+ checkState();
+
+ RowT right = rightIt.next();
+
+ --requested;
+
+ RowT row = handler.concat(left, right);
+ downstream().push(row);
+
+ if (requested == 0) {
+ break;
+ }
+ }
+ }
+
+ if (!rightIt.hasNext()) {
+ left = null;
+ }
+ }
+ } finally {
+ inLoop = false;
+ }
+ }
+
+ if (left == null && leftInBuf.isEmpty() && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && requested > 0) {
+ inLoop = true;
+ try {
+ if (!rightIt.hasNext()) {
+ rightIt = getUntouched(hashStore);
+ }
+
+ RowT emptyLeft = leftRowFactory.create();
+
+ while (rightIt.hasNext()) {
+ checkState();
+ RowT right = rightIt.next();
+ RowT row = handler.concat(emptyLeft, right);
+ --requested;
+
+ downstream().push(row);
+
+ if (requested == 0) {
+ break;
+ }
+ }
+ } finally {
+ inLoop = false;
+ }
+ }
+
+ getMoreOrEnd();
+ }
+ }
+
+ private static class FullOuterHashJoin<RowT> extends HashJoinNode<RowT> {
+ /** Left row factory. */
+ private final RowHandler.RowFactory<RowT> leftRowFactory;
+
+ /** Right row factory. */
+ private final RowHandler.RowFactory<RowT> rightRowFactory;
+
+ private FullOuterHashJoin(
+ ExecutionContext<RowT> ctx,
+ RowHandler.RowFactory<RowT> leftRowFactory,
+ RowHandler.RowFactory<RowT> rightRowFactory,
+ JoinInfo joinInfo,
+ RelDataType leftRowType,
+ RelDataType rightRowType
+ ) {
+ super(ctx, joinInfo, true, leftRowType, rightRowType);
+
+ this.leftRowFactory = leftRowFactory;
+ this.rightRowFactory = rightRowFactory;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void join() throws Exception {
+ if (waitingRight == NOT_WAITING) {
+ inLoop = true;
+ try {
+ while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
+ checkState();
+
+ if (!rightIt.hasNext()) {
+ left = leftInBuf.remove();
+
+ Collection<RowT> rightRows = lookup(left, touchResults);
+
+ if (rightRows.isEmpty()) {
+ requested--;
+ downstream().push(handler.concat(left, rightRowFactory.create()));
+ }
+
+ rightIt = rightRows.iterator();
+ }
+
+ if (rightIt.hasNext()) {
+ while (rightIt.hasNext()) {
+ checkState();
+
+ RowT right = rightIt.next();
+
+ --requested;
+
+ RowT row = handler.concat(left, right);
+ downstream().push(row);
+
+ if (requested == 0) {
+ break;
+ }
+ }
+ }
+
+ if (!rightIt.hasNext()) {
+ left = null;
+ }
+ }
+ } finally {
+ inLoop = false;
+ }
+ }
+
+ if (left == null && !rightIt.hasNext() && leftInBuf.isEmpty() && waitingLeft == NOT_WAITING
+ && waitingRight == NOT_WAITING && requested > 0) {
+ inLoop = true;
+ try {
+ if (!rightIt.hasNext()) {
+ rightIt = getUntouched(hashStore);
+ }
+
+ RowT emptyLeft = leftRowFactory.create();
+
+ while (rightIt.hasNext()) {
+ checkState();
+ RowT right = rightIt.next();
+ RowT row = handler.concat(emptyLeft, right);
+ --requested;
+
+ downstream().push(row);
+
+ if (requested == 0) {
+ break;
+ }
+ }
+ } finally {
+ inLoop = false;
+ }
+ }
+
+ getMoreOrEnd();
+ }
+ }
+
+ private static class SemiHashJoin<RowT> extends HashJoinNode<RowT> {
+ private SemiHashJoin(
+ ExecutionContext<RowT> ctx,
+ JoinInfo joinInfo,
+ RelDataType leftRowType,
+ RelDataType rightRowType
+ ) {
+ super(ctx, joinInfo, false, leftRowType, rightRowType);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void join() throws Exception {
+ if (waitingRight == NOT_WAITING) {
+ inLoop = true;
+ try {
+ while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
+ checkState();
+
+ left = leftInBuf.remove();
+
+ Collection<RowT> rightRows = lookup(left, touchResults);
+
+ if (!rightRows.isEmpty()) {
+ requested--;
+
+ downstream().push(left);
+
+ if (requested == 0) {
+ break;
+ }
+ }
+
+ left = null;
+ }
+ } finally {
+ inLoop = false;
+ }
+ }
+
+ getMoreOrEnd();
+ }
+ }
+
+ private static class AntiHashJoin<RowT> extends HashJoinNode<RowT> {
+ private AntiHashJoin(
+ ExecutionContext<RowT> ctx,
+ JoinInfo joinInfo,
+ RelDataType leftRowType,
+ RelDataType rightRowType
+ ) {
+ super(ctx, joinInfo, false, leftRowType, rightRowType);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void join() throws Exception {
+ if (waitingRight == NOT_WAITING) {
+ inLoop = true;
+ try {
+ while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
+ checkState();
+
+ left = leftInBuf.remove();
+
+ Collection<RowT> rightRows = lookup(left, touchResults);
+
+ if (rightRows.isEmpty()) {
+ requested--;
+
+ downstream().push(left);
+
+ if (requested == 0) {
+ break;
+ }
+ }
+
+ left = null;
+ }
+ } finally {
+ inLoop = false;
+ }
+ }
+
+ getMoreOrEnd();
+ }
+ }
+
+ Collection<RowT> lookup(RowT row, boolean processTouched) {
+ Collection<RowT> coll = Collections.emptyList();
+
+ for (Integer entry : leftJoinPositions) {
+ Object ent = handler.get(entry, row);
+
+ if (ent == null) {
+ leftRowBuilder.reset();
+ return Collections.emptyList();
+ }
+
+ leftRowBuilder.addField(ent);
+ }
+
+ RowWrapper<RowT> row0 = new RowWrapper<>(leftRowBuilder.buildAndReset(), handler, leftJoinPositions.size());
+
+ TouchedCollection<RowT> found = hashStore.get(row0);
+
+ if (found != null) {
+ coll = found.items();
+
+ if (processTouched) {
+ found.touched = true;
+ }
+ }
+
+ return coll;
+ }
+
+ private static <RowT> Iterator<RowT> getUntouched(Map<RowWrapper<RowT>, TouchedCollection<RowT>> entries) {
+ return new Iterator<RowT>() {
+ private final Iterator<TouchedCollection<RowT>> it = entries.values().iterator();
+ private Iterator<RowT> innerIt = Collections.emptyIterator();
+
+ @Override
+ public boolean hasNext() {
+ if (innerIt.hasNext()) {
+ return true;
+ }
+
+ advance();
+
+ return innerIt.hasNext();
+ }
+
+ @Override
+ public RowT next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return innerIt.next();
+ }
+
+ void advance() {
+ while (it.hasNext()) {
+ TouchedCollection<RowT> coll = it.next();
+ if (!coll.touched && !coll.items().isEmpty()) {
+ innerIt = coll.items().iterator();
+ break;
+ }
+ }
+ }
+ };
+ }
+
+ @Override
+ protected void pushRight(RowT row) throws Exception {
+ assert downstream() != null;
+ assert waitingRight > 0;
+
+ checkState();
+
+ waitingRight--;
+
+ for (Integer entry : rightJoinPositions) {
+ Object ent = handler.get(entry, row);
+ rightRowBuilder.addField(ent);
+ }
+
+ RowWrapper<RowT> row0 = new RowWrapper<>(rightRowBuilder.buildAndReset(), handler, rightJoinPositions.size());
+ TouchedCollection<RowT> raw = hashStore.computeIfAbsent(row0, k -> new TouchedCollection<>());
+ raw.add(row);
+
+ if (waitingRight == 0) {
+ rightSource().request(waitingRight = inBufSize);
+ }
+ }
+
+ private static class RowWrapper<RowT> {
+ RowT row;
+ RowHandler<RowT> handler;
+ int itemsCount;
+
+ RowWrapper(RowT row, RowHandler<RowT> handler, int itemsCount) {
+ this.row = row;
+ this.handler = handler;
+ this.itemsCount = itemsCount;
+ }
+
+ @Override
+ public int hashCode() {
+ int hashCode = 0;
+ for (int i = 0; i < itemsCount; ++i) {
+ Object entHold = handler.get(i, row);
+ hashCode += Objects.hashCode(entHold);
+ }
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ RowWrapper<RowT> row0 = (RowWrapper<RowT>) obj;
+ for (int i = 0; i < itemsCount; ++i) {
+ Object input = handler.get(i, row0.row);
+ Object current = handler.get(i, row);
+ boolean comp = Objects.equals(input, current);
+ if (!comp) {
+ return comp;
+ }
+ }
+ return true;
+ }
+ }
+
+ void getMoreOrEnd() throws Exception {
+ if (waitingRight == 0) {
+ rightSource().request(waitingRight = inBufSize);
+ }
+
+ if (waitingLeft == 0 && leftInBuf.isEmpty()) {
+ leftSource().request(waitingLeft = inBufSize);
+ }
+
+ if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && leftInBuf.isEmpty() && left == null
+ && !rightIt.hasNext()) {
+ requested = 0;
+ downstream().end();
+ }
+ }
+
+ private static class TouchedCollection<RowT> {
+ Collection<RowT> coll;
+ boolean touched;
+
+ TouchedCollection() {
+ this.coll = new ArrayList<>();
+ }
+
+ void add(RowT row) {
+ coll.add(row);
+ }
+
+ Collection<RowT> items() {
+ return coll;
+ }
+ }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinNode.java
index bcbebb0..a2807b1 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinNode.java
@@ -18,12 +18,9 @@
package org.apache.ignite.internal.sql.engine.exec.rel;
import static org.apache.ignite.internal.sql.engine.util.TypeUtils.rowSchemaFromRelTypes;
-import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
-import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
-import java.util.Deque;
import java.util.List;
import java.util.function.BiPredicate;
import org.apache.calcite.plan.RelOptUtil;
@@ -38,25 +35,12 @@
* NestedLoopJoinNode.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
-public abstract class NestedLoopJoinNode<RowT> extends AbstractNode<RowT> {
- /** Special value to highlights that all row were received and we are not waiting any more. */
- protected static final int NOT_WAITING = -1;
-
+public abstract class NestedLoopJoinNode<RowT> extends AbstractRightMaterializedJoinNode<RowT> {
protected final BiPredicate<RowT, RowT> cond;
protected final RowHandler<RowT> handler;
- protected int requested;
-
- protected int waitingLeft;
-
- protected int waitingRight;
-
- protected final List<RowT> rightMaterialized = new ArrayList<>(inBufSize);
-
- protected final Deque<RowT> leftInBuf = new ArrayDeque<>(inBufSize);
-
- protected boolean inLoop;
+ final List<RowT> rightMaterialized = new ArrayList<>(inBufSize);
/**
* Constructor.
@@ -65,7 +49,7 @@
* @param ctx Execution context.
* @param cond Join expression.
*/
- private NestedLoopJoinNode(ExecutionContext<RowT> ctx, BiPredicate<RowT, RowT> cond) {
+ NestedLoopJoinNode(ExecutionContext<RowT> ctx, BiPredicate<RowT, RowT> cond) {
super(ctx);
this.cond = cond;
@@ -74,98 +58,14 @@
/** {@inheritDoc} */
@Override
- public void request(int rowsCnt) throws Exception {
- assert !nullOrEmpty(sources()) && sources().size() == 2;
- assert rowsCnt > 0 && requested == 0;
-
- checkState();
-
- requested = rowsCnt;
-
- if (!inLoop) {
- context().execute(this::doJoin, this::onError);
- }
- }
-
- private void doJoin() throws Exception {
- checkState();
-
- join();
- }
-
- /** {@inheritDoc} */
- @Override
protected void rewindInternal() {
- requested = 0;
- waitingLeft = 0;
- waitingRight = 0;
-
rightMaterialized.clear();
- leftInBuf.clear();
+
+ super.rewindInternal();
}
- /** {@inheritDoc} */
@Override
- protected Downstream<RowT> requestDownstream(int idx) {
- if (idx == 0) {
- return new Downstream<RowT>() {
- /** {@inheritDoc} */
- @Override
- public void push(RowT row) throws Exception {
- pushLeft(row);
- }
-
- /** {@inheritDoc} */
- @Override
- public void end() throws Exception {
- endLeft();
- }
-
- /** {@inheritDoc} */
- @Override
- public void onError(Throwable e) {
- NestedLoopJoinNode.this.onError(e);
- }
- };
- } else if (idx == 1) {
- return new Downstream<RowT>() {
- /** {@inheritDoc} */
- @Override
- public void push(RowT row) throws Exception {
- pushRight(row);
- }
-
- /** {@inheritDoc} */
- @Override
- public void end() throws Exception {
- endRight();
- }
-
- /** {@inheritDoc} */
- @Override
- public void onError(Throwable e) {
- NestedLoopJoinNode.this.onError(e);
- }
- };
- }
-
- throw new IndexOutOfBoundsException();
- }
-
- private void pushLeft(RowT row) throws Exception {
- assert downstream() != null;
- assert waitingLeft > 0;
-
- checkState();
-
- waitingLeft--;
-
- leftInBuf.add(row);
-
- join();
- }
-
- private void pushRight(RowT row) throws Exception {
+ protected void pushRight(RowT row) throws Exception {
assert downstream() != null;
assert waitingRight > 0;
@@ -180,38 +80,6 @@
}
}
- private void endLeft() throws Exception {
- assert downstream() != null;
- assert waitingLeft > 0;
-
- checkState();
-
- waitingLeft = NOT_WAITING;
-
- join();
- }
-
- private void endRight() throws Exception {
- assert downstream() != null;
- assert waitingRight > 0;
-
- checkState();
-
- waitingRight = NOT_WAITING;
-
- join();
- }
-
- protected Node<RowT> leftSource() {
- return sources().get(0);
- }
-
- protected Node<RowT> rightSource() {
- return sources().get(1);
- }
-
- protected abstract void join() throws Exception;
-
/**
* Create.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -257,8 +125,6 @@
}
private static class InnerJoin<RowT> extends NestedLoopJoinNode<RowT> {
- private RowT left;
-
private int rightIdx;
/**
@@ -275,7 +141,6 @@
/** {@inheritDoc} */
@Override
protected void rewindInternal() {
- left = null;
rightIdx = 0;
super.rewindInternal();
@@ -335,8 +200,6 @@
/** Whether current left row was matched or not. */
private boolean matched;
- private RowT left;
-
private int rightIdx;
/**
@@ -360,7 +223,6 @@
@Override
protected void rewindInternal() {
matched = false;
- left = null;
rightIdx = 0;
super.rewindInternal();
@@ -430,15 +292,13 @@
}
private static class RightJoin<RowT> extends NestedLoopJoinNode<RowT> {
- /** Right row factory. */
+ /** Left row factory. */
private final RowHandler.RowFactory<RowT> leftRowFactory;
private @Nullable BitSet rightNotMatchedIndexes;
private int lastPushedInd;
- private RowT left;
-
private int rightIdx;
/**
@@ -462,7 +322,6 @@
/** {@inheritDoc} */
@Override
protected void rewindInternal() {
- left = null;
rightNotMatchedIndexes = null;
lastPushedInd = 0;
rightIdx = 0;
@@ -573,8 +432,6 @@
private int lastPushedInd;
- private RowT left;
-
private int rightIdx;
/**
@@ -601,7 +458,6 @@
/** {@inheritDoc} */
@Override
protected void rewindInternal() {
- left = null;
leftMatched = false;
rightNotMatchedIndexes = null;
lastPushedInd = 0;
@@ -714,8 +570,6 @@
}
private static class SemiJoin<RowT> extends NestedLoopJoinNode<RowT> {
- private RowT left;
-
private int rightIdx;
/**
@@ -732,7 +586,6 @@
/** {@inheritDoc} */
@Override
protected void rewindInternal() {
- left = null;
rightIdx = 0;
super.rewindInternal();
@@ -786,8 +639,6 @@
}
private static class AntiJoin<RowT> extends NestedLoopJoinNode<RowT> {
- private RowT left;
-
private int rightIdx;
/**
@@ -803,7 +654,6 @@
@Override
protected void rewindInternal() {
- left = null;
rightIdx = 0;
super.rewindInternal();
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java
index 6a980e9..158c157 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java
@@ -22,6 +22,7 @@
import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
+import org.apache.ignite.internal.sql.engine.rel.IgniteHashJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
@@ -89,6 +90,12 @@
/** {@inheritDoc} */
@Override
+ public IgniteRel visit(IgniteHashJoin rel) {
+ return processNode(rel);
+ }
+
+ /** {@inheritDoc} */
+ @Override
public IgniteRel visit(IgniteCorrelatedNestedLoopJoin rel) {
return processNode(rel);
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
index a3cd8b3..e362a3c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
@@ -50,6 +50,7 @@
import org.apache.ignite.internal.sql.engine.rule.FilterSpoolMergeToHashIndexSpoolRule;
import org.apache.ignite.internal.sql.engine.rule.FilterSpoolMergeToSortedIndexSpoolRule;
import org.apache.ignite.internal.sql.engine.rule.HashAggregateConverterRule;
+import org.apache.ignite.internal.sql.engine.rule.HashJoinConverterRule;
import org.apache.ignite.internal.sql.engine.rule.LogicalScanConverterRule;
import org.apache.ignite.internal.sql.engine.rule.MergeJoinConverterRule;
import org.apache.ignite.internal.sql.engine.rule.NestedLoopJoinConverterRule;
@@ -227,6 +228,7 @@
CorrelateToNestedLoopRule.INSTANCE,
NestedLoopJoinConverterRule.INSTANCE,
+ HashJoinConverterRule.INSTANCE,
ValuesConverterRule.INSTANCE,
LogicalScanConverterRule.INDEX_SCAN,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteHashJoin.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteHashJoin.java
new file mode 100644
index 0000000..5ea0388
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteHashJoin.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.rel;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+
+/**
+ * Relational operator that represent hash join algo.
+ */
+public class IgniteHashJoin extends AbstractIgniteJoin {
+ private static final String REL_TYPE_NAME = "HashJoin";
+
+ public IgniteHashJoin(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right,
+ RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
+ super(cluster, traitSet, left, right, condition, variablesSet, joinType);
+ }
+
+ /** Constructor. */
+ public IgniteHashJoin(RelInput input) {
+ this(input.getCluster(),
+ input.getTraitSet().replace(IgniteConvention.INSTANCE),
+ input.getInputs().get(0),
+ input.getInputs().get(1),
+ input.getExpression("condition"),
+ Set.copyOf(Commons.transform(input.getIntegerList("variablesSet"), CorrelationId::new)),
+ input.getEnum("joinType", JoinRelType.class));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ IgniteCostFactory costFactory = (IgniteCostFactory) planner.getCostFactory();
+
+ double leftRowCount = mq.getRowCount(getLeft());
+ double rightRowCount = mq.getRowCount(getRight());
+
+ if (Double.isInfinite(leftRowCount) || Double.isInfinite(rightRowCount)) {
+ return planner.getCostFactory().makeInfiniteCost();
+ }
+
+ double rowCount = leftRowCount + rightRowCount;
+
+ int rightKeysSize = joinInfo.rightKeys.size();
+
+ double rightSize = rightRowCount * IgniteCost.AVERAGE_FIELD_SIZE * getRight().getRowType().getFieldCount();
+
+ double distRightRows = Util.first(mq.getDistinctRowCount(right, ImmutableBitSet.of(joinInfo.rightKeys), null), 0.9 * rightRowCount);
+
+ rightSize += distRightRows * rightKeysSize * IgniteCost.AVERAGE_FIELD_SIZE;
+
+ return costFactory.makeCost(rowCount, rowCount * IgniteCost.HASH_LOOKUP_COST, 0, rightSize, 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Join copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType,
+ boolean semiJoinDone) {
+ return new IgniteHashJoin(getCluster(), traitSet, left, right, condition, variablesSet, joinType);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new IgniteHashJoin(cluster, getTraitSet(), inputs.get(0), inputs.get(1), getCondition(),
+ getVariablesSet(), getJoinType());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getRelTypeName() {
+ return REL_TYPE_NAME;
+ }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java
index ddaf73d..5f4238c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java
@@ -57,6 +57,11 @@
/**
* See {@link IgniteRelVisitor#visit(IgniteRel)}.
*/
+ T visit(IgniteHashJoin rel);
+
+ /**
+ * See {@link IgniteRelVisitor#visit(IgniteRel)}.
+ */
T visit(IgniteCorrelatedNestedLoopJoin rel);
/**
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashJoinConverterRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashJoinConverterRule.java
new file mode 100644
index 0000000..c2f7ae7
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashJoinConverterRule.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.rule;
+
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
+import org.apache.ignite.internal.sql.engine.rel.IgniteHashJoin;
+
+/**
+ * Hash join converter.
+ */
+public class HashJoinConverterRule extends AbstractIgniteConverterRule<LogicalJoin> {
+ public static final RelOptRule INSTANCE = new HashJoinConverterRule();
+
+ /**
+ * Creates a converter.
+ */
+ public HashJoinConverterRule() {
+ super(LogicalJoin.class, "HashJoinConverter");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalJoin logicalJoin = call.rel(0);
+
+ return !nullOrEmpty(logicalJoin.analyzeCondition().pairs())
+ && logicalJoin.analyzeCondition().isEqui() && acceptableConditions(logicalJoin.getCondition());
+ }
+
+ private static boolean acceptableConditions(RexNode node) {
+ RexVisitor<Void> v = new RexVisitorImpl<>(true) {
+ @Override
+ public Void visitCall(RexCall call) {
+ SqlKind opKind = call.getOperator().getKind();
+ if (opKind != SqlKind.EQUALS && opKind != SqlKind.AND) {
+ throw Util.FoundOne.NULL;
+ }
+ return super.visitCall(call);
+ }
+ };
+
+ try {
+ node.accept(v);
+
+ return true;
+ } catch (Util.FoundOne e) {
+ return false;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalJoin rel) {
+ RelOptCluster cluster = rel.getCluster();
+ RelTraitSet outTraits = cluster.traitSetOf(IgniteConvention.INSTANCE);
+ RelTraitSet leftInTraits = cluster.traitSetOf(IgniteConvention.INSTANCE);
+ RelTraitSet rightInTraits = cluster.traitSetOf(IgniteConvention.INSTANCE);
+ RelNode left = convert(rel.getLeft(), leftInTraits);
+ RelNode right = convert(rel.getRight(), rightInTraits);
+
+ return new IgniteHashJoin(cluster, outTraits, left, right, rel.getCondition(), rel.getVariablesSet(), rel.getJoinType());
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
index 569f286..3e32ab8 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
@@ -162,6 +162,20 @@
}
@Test
+ public void testHashJoin() {
+ addNodes("N0", "N1", "N2", "N3", "N4");
+
+ addTable("T1", "N1");
+ addTable("T2", "N1");
+ addTable("T2", "N2");
+
+ setRowCount("T1", 200);
+ setRowCount("T2", 100);
+
+ testRunner.runTest(this::initSchema, "hash_join.test");
+ }
+
+ @Test
public void testTableIdentity() {
addNodes("N0", "N1", "N2");
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractJoinExecutionTest.java
new file mode 100644
index 0000000..3be2f2f
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractJoinExecutionTest.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.apache.calcite.rel.core.JoinRelType.ANTI;
+import static org.apache.calcite.rel.core.JoinRelType.FULL;
+import static org.apache.calcite.rel.core.JoinRelType.INNER;
+import static org.apache.calcite.rel.core.JoinRelType.LEFT;
+import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
+import static org.apache.calcite.rel.core.JoinRelType.SEMI;
+import static org.apache.ignite.internal.sql.engine.util.Commons.getFieldFromBiRows;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.TestDownstream;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.junit.jupiter.api.Test;
+
+/** Common join execution test. */
+public abstract class AbstractJoinExecutionTest extends AbstractExecutionTest<Object[]> {
+ abstract JoinAlgo joinAlgo();
+
+ @Test
+ public void joinEmptyTables() {
+ verifyJoin(EMPTY, EMPTY, INNER, EMPTY, joinAlgo());
+ verifyJoin(EMPTY, EMPTY, LEFT, EMPTY, joinAlgo());
+ verifyJoin(EMPTY, EMPTY, RIGHT, EMPTY, joinAlgo());
+ verifyJoin(EMPTY, EMPTY, FULL, EMPTY, joinAlgo());
+ verifyJoin(EMPTY, EMPTY, SEMI, EMPTY, joinAlgo());
+ verifyJoin(EMPTY, EMPTY, ANTI, EMPTY, joinAlgo());
+ }
+
+ @Test
+ public void joinEmptyLeftTable() {
+ Object[][] right = {
+ {1, "Core"},
+ {1, "OLD_Core"},
+ {2, "SQL"}
+ };
+
+ verifyJoin(EMPTY, right, INNER, EMPTY, joinAlgo());
+ verifyJoin(EMPTY, right, LEFT, EMPTY, joinAlgo());
+ verifyJoin(EMPTY, right, RIGHT, new Object[][]{
+ {null, null, "Core"},
+ {null, null, "OLD_Core"},
+ {null, null, "SQL"}
+ }, joinAlgo());
+ verifyJoin(EMPTY, right, FULL, new Object[][]{
+ {null, null, "Core"},
+ {null, null, "OLD_Core"},
+ {null, null, "SQL"}
+ }, joinAlgo());
+ verifyJoin(EMPTY, right, SEMI, EMPTY, joinAlgo());
+ verifyJoin(EMPTY, right, ANTI, EMPTY, joinAlgo());
+ }
+
+ @Test
+ public void joinEmptyRightTable() {
+ Object[][] left = {
+ {1, "Roman", null},
+ {2, "Igor", 1},
+ {3, "Alexey", 2}
+ };
+
+ verifyJoin(left, EMPTY, INNER, EMPTY, joinAlgo());
+ verifyJoin(left, EMPTY, LEFT, new Object[][]{
+ {1, "Roman", null},
+ {2, "Igor", null},
+ {3, "Alexey", null}
+ }, joinAlgo());
+ verifyJoin(left, EMPTY, RIGHT, EMPTY, joinAlgo());
+ verifyJoin(left, EMPTY, FULL, new Object[][]{
+ {1, "Roman", null},
+ {2, "Igor", null},
+ {3, "Alexey", null}
+ }, joinAlgo());
+ verifyJoin(left, EMPTY, SEMI, EMPTY, joinAlgo());
+ verifyJoin(left, EMPTY, ANTI, new Object[][]{
+ {1, "Roman"},
+ {2, "Igor"},
+ {3, "Alexey"}
+ }, joinAlgo());
+ }
+
+ @Test
+ public void joinOneToMany() {
+ Object[][] left = {
+ {1, "Roman", null},
+ {2, "Igor", 1},
+ {3, "Alexey", 2}
+ };
+
+ Object[][] right = {
+ {1, "Core"},
+ {1, "OLD_Core"},
+ {2, "SQL"},
+ {3, "Arch"}
+ };
+
+ verifyJoin(left, right, INNER, new Object[][]{
+ {2, "Igor", "Core"},
+ {2, "Igor", "OLD_Core"},
+ {3, "Alexey", "SQL"}
+ }, joinAlgo());
+ verifyJoin(left, right, LEFT, new Object[][]{
+ {1, "Roman", null},
+ {2, "Igor", "Core"},
+ {2, "Igor", "OLD_Core"},
+ {3, "Alexey", "SQL"}
+ }, joinAlgo());
+ verifyJoin(left, right, RIGHT, new Object[][]{
+ {2, "Igor", "Core"},
+ {2, "Igor", "OLD_Core"},
+ {3, "Alexey", "SQL"},
+ {null, null, "Arch"}
+ }, joinAlgo());
+ verifyJoin(left, right, FULL, new Object[][]{
+ {1, "Roman", null},
+ {2, "Igor", "Core"},
+ {2, "Igor", "OLD_Core"},
+ {3, "Alexey", "SQL"},
+ {null, null, "Arch"}
+ }, joinAlgo());
+ verifyJoin(left, right, SEMI, new Object[][]{
+ {2, "Igor"},
+ {3, "Alexey"}
+ }, joinAlgo());
+ verifyJoin(left, right, ANTI, new Object[][]{
+ {1, "Roman"}
+ }, joinAlgo());
+ }
+
+ @Test
+ public void testLeftJoin() {
+ // select e.id, e.name, d.name as dep_name
+ // from emp e
+ // left join dep d
+ // on e.depno = d.depno
+
+ Object[][] persons = {
+ new Object[]{0, "Igor", 1},
+ new Object[]{1, "Roman", 2},
+ new Object[]{2, "Ivan", null},
+ new Object[]{3, "Alexey", 1}
+ };
+
+ Object[][] deps = {
+ new Object[]{1, "Core"},
+ new Object[]{2, "SQL"}
+ };
+
+ verifyJoin(persons, deps, LEFT, new Object[][]{
+ {0, "Igor", "Core"},
+ {1, "Roman", "SQL"},
+ {2, "Ivan", null},
+ {3, "Alexey", "Core"},
+ }, joinAlgo());
+ }
+
+ @Test
+ public void testSemiJoin() {
+ // select d.name as dep_name
+ // from dep d
+ // semi join emp e
+ // on e.depno = d.depno
+
+ Object[][] persons = {
+ new Object[]{1, "Igor", 0},
+ new Object[]{2, "Roman", 1},
+ new Object[]{null, "Ivan", 2},
+ new Object[]{1, "Alexey", 3}
+ };
+
+ Object[][] deps = {
+ new Object[]{1, "Core", 1},
+ new Object[]{2, "SQL", 2},
+ new Object[]{3, "QA", 3}
+ };
+
+ verifyJoin(deps, persons, SEMI, new Object[][]{
+ {1, "Core"},
+ {2, "SQL"},
+ }, joinAlgo());
+ }
+
+ @Test
+ public void testAntiJoin() {
+ // select d.name as dep_name
+ // from dep d
+ // anti join emp e
+ // on e.depno = d.depno
+
+ Object[][] persons = {
+ new Object[]{1, "Igor", },
+ new Object[]{2, "Roman", 1},
+ new Object[]{null, "Ivan", 2},
+ new Object[]{1, "Alexey", 3}
+ };
+
+ Object[][] deps = {
+ new Object[]{1, "Core", 1},
+ new Object[]{2, "SQL", 2},
+ new Object[]{3, "QA", 3}
+ };
+
+ verifyJoin(deps, persons, ANTI, new Object[][]{
+ {3, "QA"}
+ }, joinAlgo());
+ }
+
+ @Test
+ public void joinOneToMany2() {
+ Object[][] left = {
+ {1, "Roman", null},
+ {2, "Igor", 1},
+ {3, "Alexey", 2},
+ {4, "Ivan", 4},
+ {5, "Taras", 5},
+ {6, "Lisa", 6}
+ };
+
+ Object[][] right = {
+ {1, "Core"},
+ {1, "OLD_Core"},
+ {2, "SQL"},
+ {3, "QA"},
+ {5, "Arch"}
+ };
+
+ verifyJoin(left, right, INNER, new Object[][]{
+ {2, "Igor", "Core"},
+ {2, "Igor", "OLD_Core"},
+ {3, "Alexey", "SQL"},
+ {5, "Taras", "Arch"}
+ }, joinAlgo());
+ verifyJoin(left, right, LEFT, new Object[][]{
+ {1, "Roman", null},
+ {2, "Igor", "Core"},
+ {2, "Igor", "OLD_Core"},
+ {3, "Alexey", "SQL"},
+ {4, "Ivan", null},
+ {5, "Taras", "Arch"},
+ {6, "Lisa", null}
+ }, joinAlgo());
+ verifyJoin(left, right, RIGHT, new Object[][]{
+ {2, "Igor", "Core"},
+ {2, "Igor", "OLD_Core"},
+ {3, "Alexey", "SQL"},
+ {5, "Taras", "Arch"},
+ {null, null, "QA"}
+ }, joinAlgo());
+ verifyJoin(left, right, FULL, new Object[][]{
+ {1, "Roman", null},
+ {2, "Igor", "Core"},
+ {2, "Igor", "OLD_Core"},
+ {3, "Alexey", "SQL"},
+ {4, "Ivan", null},
+ {5, "Taras", "Arch"},
+ {6, "Lisa", null},
+ {null, null, "QA"}
+ }, joinAlgo());
+ verifyJoin(left, right, SEMI, new Object[][]{
+ {2, "Igor"},
+ {3, "Alexey"},
+ {5, "Taras"}
+ }, joinAlgo());
+ verifyJoin(left, right, ANTI, new Object[][]{
+ {1, "Roman"},
+ {4, "Ivan"},
+ {6, "Lisa"}
+ }, joinAlgo());
+ }
+
+ @Test
+ public void joinManyToMany() {
+ Object[][] left = {
+ {1, "Roman", null},
+ {2, "Igor", 1},
+ {3, "Taras", 1},
+ {4, "Alexey", 2},
+ {5, "Ivan", 4},
+ {6, "Andrey", 4}
+ };
+
+ Object[][] right = {
+ {1, "Core"},
+ {1, "OLD_Core"},
+ {2, "SQL"},
+ {3, "Arch"},
+ {4, "QA"},
+ {4, "OLD_QA"},
+ };
+
+ verifyJoin(left, right, INNER, new Object[][]{
+ {2, "Igor", "Core"},
+ {2, "Igor", "OLD_Core"},
+ {3, "Taras", "Core"},
+ {3, "Taras", "OLD_Core"},
+ {4, "Alexey", "SQL"},
+ {5, "Ivan", "OLD_QA"},
+ {5, "Ivan", "QA"},
+ {6, "Andrey", "OLD_QA"},
+ {6, "Andrey", "QA"}
+ }, joinAlgo());
+ verifyJoin(left, right, LEFT, new Object[][]{
+ {1, "Roman", null},
+ {2, "Igor", "Core"},
+ {2, "Igor", "OLD_Core"},
+ {3, "Taras", "Core"},
+ {3, "Taras", "OLD_Core"},
+ {4, "Alexey", "SQL"},
+ {5, "Ivan", "OLD_QA"},
+ {5, "Ivan", "QA"},
+ {6, "Andrey", "OLD_QA"},
+ {6, "Andrey", "QA"}
+ }, joinAlgo());
+ verifyJoin(left, right, RIGHT, new Object[][]{
+ {2, "Igor", "Core"},
+ {2, "Igor", "OLD_Core"},
+ {3, "Taras", "Core"},
+ {3, "Taras", "OLD_Core"},
+ {4, "Alexey", "SQL"},
+ {5, "Ivan", "OLD_QA"},
+ {5, "Ivan", "QA"},
+ {6, "Andrey", "OLD_QA"},
+ {6, "Andrey", "QA"},
+ {null, null, "Arch"}
+ }, joinAlgo());
+ verifyJoin(left, right, FULL, new Object[][]{
+ {1, "Roman", null},
+ {2, "Igor", "Core"},
+ {2, "Igor", "OLD_Core"},
+ {3, "Taras", "Core"},
+ {3, "Taras", "OLD_Core"},
+ {4, "Alexey", "SQL"},
+ {5, "Ivan", "OLD_QA"},
+ {5, "Ivan", "QA"},
+ {6, "Andrey", "OLD_QA"},
+ {6, "Andrey", "QA"},
+ {null, null, "Arch"}
+ }, joinAlgo());
+ verifyJoin(left, right, SEMI, new Object[][]{
+ {2, "Igor"},
+ {3, "Taras"},
+ {4, "Alexey"},
+ {5, "Ivan"},
+ {6, "Andrey"},
+ }, joinAlgo());
+ verifyJoin(left, right, ANTI, new Object[][]{
+ {1, "Roman"}
+ }, joinAlgo());
+ }
+
+ enum JoinAlgo {
+ HASH,
+ NESTED_LOOP
+ }
+
+ /**
+ * Creates execution tree and executes it. Then compares the result of the execution with the given one.
+ *
+ * @param left Data for left table.
+ * @param right Data for right table.
+ * @param joinType Join type.
+ * @param expRes Expected result.
+ */
+ private void verifyJoin(Object[][] left, Object[][] right, JoinRelType joinType, Object[][] expRes, JoinAlgo algo) {
+ ExecutionContext<Object[]> ctx = executionContext(true);
+
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+
+ RelDataType leftType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
+ NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
+ ScanNode<Object[]> leftNode = new ScanNode<>(ctx, Arrays.asList(left));
+
+ RelDataType rightType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
+ ScanNode<Object[]> rightNode = new ScanNode<>(ctx, Arrays.asList(right));
+
+ RelDataType outType;
+ if (setOf(SEMI, ANTI).contains(joinType)) {
+ outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
+ NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
+ } else {
+ outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
+ NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32, NativeTypes.INT32, NativeTypes.STRING));
+ }
+
+ RowHandler<Object[]> hnd = ctx.rowHandler();
+
+ AbstractRightMaterializedJoinNode<Object[]> join;
+
+ if (algo == JoinAlgo.NESTED_LOOP) {
+ join = NestedLoopJoinNode.create(ctx, outType, leftType, rightType, joinType,
+ (r1, r2) -> getFieldFromBiRows(hnd, 2, r1, r2) == getFieldFromBiRows(hnd, 3, r1, r2));
+ } else {
+ join = HashJoinNode.create(ctx, outType, leftType, rightType, joinType,
+ JoinInfo.of(ImmutableIntList.of(2), ImmutableIntList.of(0)));
+ }
+
+ join.register(asList(leftNode, rightNode));
+
+ ProjectNode<Object[]> project;
+ SortNode<Object[]> sortNode;
+ if (setOf(SEMI, ANTI).contains(joinType)) {
+ project = new ProjectNode<>(ctx, r -> new Object[]{r[0], r[1]});
+ RelCollation collation = RelCollations.of(ImmutableIntList.of(0, 1));
+ Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
+
+ sortNode = new SortNode<>(ctx, cmp);
+ } else {
+ project = new ProjectNode<>(ctx, r -> new Object[]{r[0], r[1], r[4]});
+ RelCollation collation = RelCollations.of(ImmutableIntList.of(0, 1, 4));
+ Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
+
+ sortNode = new SortNode<>(ctx, cmp);
+ }
+
+ sortNode.register(join);
+
+ project.register(sortNode);
+
+ // first, let's rewind just created tree -- it's how it actually being executed
+ project.rewind();
+
+ int times = 2;
+ do {
+ TestDownstream<Object[]> downstream = new TestDownstream<>();
+ project.onRegister(downstream);
+
+ ctx.execute(() -> project.request(1024), project::onError);
+
+ Object[][] res = await(downstream.result()).toArray(EMPTY);
+
+ assertArrayEquals(expRes, res);
+
+ // now let's rewind and restart test to check whether all state has been correctly reset
+ project.rewind();
+ } while (times-- > 0);
+ }
+
+ /**
+ * Creates {@link Set set} from provided items.
+ *
+ * @param items Items.
+ * @return New set.
+ */
+ @SafeVarargs
+ private static <T> Set<T> setOf(T... items) {
+ return new HashSet<>(Arrays.asList(items));
+ }
+
+ @Override
+ protected RowHandler<Object[]> rowHandler() {
+ return ArrayRowHandler.INSTANCE;
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
index 162b8b0..24bc4fc 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
@@ -19,12 +19,9 @@
import static java.lang.Math.max;
import static java.lang.Math.min;
-import static org.apache.calcite.rel.core.JoinRelType.ANTI;
import static org.apache.calcite.rel.core.JoinRelType.FULL;
import static org.apache.calcite.rel.core.JoinRelType.INNER;
import static org.apache.calcite.rel.core.JoinRelType.LEFT;
-import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
-import static org.apache.calcite.rel.core.JoinRelType.SEMI;
import static org.apache.ignite.internal.sql.engine.util.Commons.getFieldFromBiRows;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.util.ArrayUtils.asList;
@@ -223,64 +220,6 @@
}
@Test
- public void testRightJoin() {
- // select e.id, e.name, d.name as dep_name
- // from dep d
- // right join emp e
- // on e.depno = d.depno
-
- ExecutionContext<Object[]> ctx = executionContext(true);
-
- ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
- new Object[]{0, "Igor", 1},
- new Object[]{1, "Roman", 2},
- new Object[]{2, "Ivan", null},
- new Object[]{3, "Alexey", 1}
- ));
-
- ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
- new Object[]{1, "Core"},
- new Object[]{2, "SQL"},
- new Object[]{3, "QA"}
- ));
-
- IgniteTypeFactory tf = ctx.getTypeFactory();
-
- RelDataType outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
- NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
- RelDataType leftType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
- RelDataType rightType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
- NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
-
- RowHandler<Object[]> hnd = ctx.rowHandler();
-
- NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, outType, leftType, rightType, RIGHT,
- (r1, r2) -> getFieldFromBiRows(hnd, 0, r1, r2) == getFieldFromBiRows(hnd, 4, r1, r2));
- join.register(asList(deps, persons));
-
- ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[2], r[3], r[1]});
- project.register(join);
-
- RootNode<Object[]> node = new RootNode<>(ctx);
- node.register(project);
-
- assert node.hasNext();
-
- ArrayList<Object[]> rows = new ArrayList<>();
-
- while (node.hasNext()) {
- rows.add(node.next());
- }
-
- assertEquals(4, rows.size());
-
- assertArrayEquals(new Object[]{0, "Igor", "Core"}, rows.get(0));
- assertArrayEquals(new Object[]{3, "Alexey", "Core"}, rows.get(1));
- assertArrayEquals(new Object[]{1, "Roman", "SQL"}, rows.get(2));
- assertArrayEquals(new Object[]{2, "Ivan", null}, rows.get(3));
- }
-
- @Test
public void testFullOuterJoin() {
// select e.id, e.name, d.name as dep_name
// from emp e
@@ -339,117 +278,6 @@
assertArrayEquals(new Object[]{null, null, "QA"}, rows.get(4));
}
- @Test
- public void testSemiJoin() {
- // select d.name as dep_name
- // from dep d
- // semi join emp e
- // on e.depno = d.depno
-
- ExecutionContext<Object[]> ctx = executionContext(true);
-
- ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
- new Object[]{0, "Igor", 1},
- new Object[]{1, "Roman", 2},
- new Object[]{2, "Ivan", null},
- new Object[]{3, "Alexey", 1}
- ));
-
- ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
- new Object[]{1, "Core"},
- new Object[]{2, "SQL"},
- new Object[]{3, "QA"}
- ));
-
- IgniteTypeFactory tf = ctx.getTypeFactory();
-
- RelDataType outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
- NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
- RelDataType leftType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
- NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
- RelDataType rightType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
-
- RowHandler<Object[]> hnd = ctx.rowHandler();
-
- NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, outType, leftType, rightType, SEMI,
- (r1, r2) -> getFieldFromBiRows(hnd, 0, r1, r2) == getFieldFromBiRows(hnd, 4, r1, r2));
- join.register(asList(deps, persons));
-
- ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[1]});
- project.register(join);
-
- RootNode<Object[]> node = new RootNode<>(ctx);
- node.register(project);
-
- assert node.hasNext();
-
- ArrayList<Object[]> rows = new ArrayList<>();
-
- while (node.hasNext()) {
- rows.add(node.next());
- }
-
- assertEquals(2, rows.size());
-
- assertArrayEquals(new Object[]{"Core"}, rows.get(0));
- assertArrayEquals(new Object[]{"SQL"}, rows.get(1));
- }
-
- @Test
- public void testAntiJoin() {
- // select d.name as dep_name
- // from dep d
- // anti join emp e
- // on e.depno = d.depno
-
- ExecutionContext<Object[]> ctx = executionContext(true);
-
- ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
- new Object[]{0, "Igor", 1},
- new Object[]{1, "Roman", 2},
- new Object[]{2, "Ivan", null},
- new Object[]{3, "Alexey", 1}
- ));
-
- ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
- new Object[]{1, "Core"},
- new Object[]{2, "SQL"},
- new Object[]{3, "QA"}
- ));
-
- IgniteTypeFactory tf = ctx.getTypeFactory();
-
- RelDataType outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
- NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
- RelDataType leftType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
- NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
- RelDataType rightType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
-
- RowHandler<Object[]> hnd = ctx.rowHandler();
-
- NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, outType, leftType, rightType, ANTI,
- (r1, r2) -> getFieldFromBiRows(hnd, 0, r1, r2) == getFieldFromBiRows(hnd, 4, r1, r2));
- join.register(asList(deps, persons));
-
- ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[1]});
- project.register(join);
-
- RootNode<Object[]> node = new RootNode<>(ctx);
- node.register(project);
-
- assert node.hasNext();
-
- ArrayList<Object[]> rows = new ArrayList<>();
-
- while (node.hasNext()) {
- rows.add(node.next());
- }
-
- assertEquals(1, rows.size());
-
- assertArrayEquals(new Object[]{"QA"}, rows.get(0));
- }
-
/**
* TestCorrelatedNestedLoopJoin.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java
new file mode 100644
index 0000000..3412ce7
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.junit.jupiter.api.Test;
+
+/** Yash join execution tests. */
+public class HashJoinExecutionTest extends AbstractJoinExecutionTest {
+ @Override
+ JoinAlgo joinAlgo() {
+ return JoinAlgo.HASH;
+ }
+
+ @Test
+ public void testHashJoinRewind() {
+ ExecutionContext<Object[]> ctx = executionContext(true);
+
+ ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+ new Object[]{0, "Igor", 1},
+ new Object[]{1, "Roman", 2},
+ new Object[]{2, "Ivan", 5},
+ new Object[]{3, "Alexey", 1}
+ ));
+
+ ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+ new Object[]{1, "Core"},
+ new Object[]{2, "SQL"},
+ new Object[]{3, "QA"}
+ ));
+
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+
+ RelDataType outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
+ NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
+ RelDataType leftType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
+ RelDataType rightType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
+ NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
+
+ AbstractRightMaterializedJoinNode<Object[]> join = HashJoinNode.create(ctx, outType, leftType, rightType, RIGHT,
+ JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(2)));
+
+ join.register(asList(deps, persons));
+
+ ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[2], r[3], r[1]});
+ project.register(join);
+
+ RootRewindable<Object[]> node = new RootRewindable<>(ctx);
+ node.register(project);
+
+ assert node.hasNext();
+
+ ArrayList<Object[]> rows = new ArrayList<>();
+
+ while (node.hasNext()) {
+ rows.add(node.next());
+ }
+
+ assertEquals(4, rows.size());
+
+ Object[][] expected = {
+ {0, "Igor", "Core"},
+ {3, "Alexey", "Core"},
+ {1, "Roman", "SQL"},
+ {2, "Ivan", null}
+ };
+
+ assert2DimArrayEquals(expected, rows);
+
+ List<Object[]> depsRes = new ArrayList<>();
+ depsRes.add(new Object[]{5, "QA"});
+
+ deps = new ScanNode<>(ctx, depsRes);
+
+ join.register(asList(deps, persons));
+
+ node.rewind();
+
+ assert node.hasNext();
+
+ ArrayList<Object[]> rowsAfterRewind = new ArrayList<>();
+
+ while (node.hasNext()) {
+ rowsAfterRewind.add(node.next());
+ }
+
+ assertEquals(4, rowsAfterRewind.size());
+
+ Object[][] expectedAfterRewind = {
+ {2, "Ivan", "QA"},
+ {1, "Roman", null},
+ {0, "Igor", null},
+ {3, "Alexey", null},
+ };
+
+ assert2DimArrayEquals(expectedAfterRewind, rowsAfterRewind);
+ }
+
+ static void assert2DimArrayEquals(Object[][] expected, ArrayList<Object[]> actual) {
+ assertEquals(expected.length, actual.size(), "expected length: " + expected.length + ", actual length: " + actual.size());
+
+ int length = expected.length;
+
+ for (int i = 0; i < length; ++i) {
+ Object[] exp = expected[i];
+ Object[] act = actual.get(i);
+
+ assertEquals(exp.length, act.length, "expected length: " + exp.length + ", actual length: " + act.length);
+ assertArrayEquals(exp, act);
+ }
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
index d66af19..e5c97dd 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
@@ -17,362 +17,10 @@
package org.apache.ignite.internal.sql.engine.exec.rel;
-import static org.apache.calcite.rel.core.JoinRelType.ANTI;
-import static org.apache.calcite.rel.core.JoinRelType.FULL;
-import static org.apache.calcite.rel.core.JoinRelType.INNER;
-import static org.apache.calcite.rel.core.JoinRelType.LEFT;
-import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
-import static org.apache.calcite.rel.core.JoinRelType.SEMI;
-import static org.apache.ignite.internal.sql.engine.util.Commons.getFieldFromBiRows;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static org.apache.ignite.internal.util.ArrayUtils.asList;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
-import org.apache.ignite.internal.sql.engine.exec.RowHandler;
-import org.apache.ignite.internal.sql.engine.exec.TestDownstream;
-import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
-import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
-import org.apache.ignite.internal.sql.engine.util.TypeUtils;
-import org.apache.ignite.internal.type.NativeTypes;
-import org.junit.jupiter.api.Test;
-
-/**
- * NestedLoopJoinExecutionTest.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
- */
-public class NestedLoopJoinExecutionTest extends AbstractExecutionTest<Object[]> {
- @Test
- public void joinEmptyTables() {
- verifyJoin(EMPTY, EMPTY, INNER, EMPTY);
- verifyJoin(EMPTY, EMPTY, LEFT, EMPTY);
- verifyJoin(EMPTY, EMPTY, RIGHT, EMPTY);
- verifyJoin(EMPTY, EMPTY, FULL, EMPTY);
- verifyJoin(EMPTY, EMPTY, SEMI, EMPTY);
- verifyJoin(EMPTY, EMPTY, ANTI, EMPTY);
- }
-
- @Test
- public void joinEmptyLeftTable() {
- Object[][] right = {
- {1, "Core"},
- {1, "OLD_Core"},
- {2, "SQL"}
- };
-
- verifyJoin(EMPTY, right, INNER, EMPTY);
- verifyJoin(EMPTY, right, LEFT, EMPTY);
- verifyJoin(EMPTY, right, RIGHT, new Object[][]{
- {null, null, "Core"},
- {null, null, "OLD_Core"},
- {null, null, "SQL"}
- });
- verifyJoin(EMPTY, right, FULL, new Object[][]{
- {null, null, "Core"},
- {null, null, "OLD_Core"},
- {null, null, "SQL"}
- });
- verifyJoin(EMPTY, right, SEMI, EMPTY);
- verifyJoin(EMPTY, right, ANTI, EMPTY);
- }
-
- @Test
- public void joinEmptyRightTable() {
- Object[][] left = {
- {1, "Roman", null},
- {2, "Igor", 1},
- {3, "Alexey", 2}
- };
-
- verifyJoin(left, EMPTY, INNER, EMPTY);
- verifyJoin(left, EMPTY, LEFT, new Object[][]{
- {1, "Roman", null},
- {2, "Igor", null},
- {3, "Alexey", null}
- });
- verifyJoin(left, EMPTY, RIGHT, EMPTY);
- verifyJoin(left, EMPTY, FULL, new Object[][]{
- {1, "Roman", null},
- {2, "Igor", null},
- {3, "Alexey", null}
- });
- verifyJoin(left, EMPTY, SEMI, EMPTY);
- verifyJoin(left, EMPTY, ANTI, new Object[][]{
- {1, "Roman"},
- {2, "Igor"},
- {3, "Alexey"}
- });
- }
-
- @Test
- public void joinOneToMany() {
- Object[][] left = {
- {1, "Roman", null},
- {2, "Igor", 1},
- {3, "Alexey", 2}
- };
-
- Object[][] right = {
- {1, "Core"},
- {1, "OLD_Core"},
- {2, "SQL"},
- {3, "Arch"}
- };
-
- verifyJoin(left, right, INNER, new Object[][]{
- {2, "Igor", "Core"},
- {2, "Igor", "OLD_Core"},
- {3, "Alexey", "SQL"}
- });
- verifyJoin(left, right, LEFT, new Object[][]{
- {1, "Roman", null},
- {2, "Igor", "Core"},
- {2, "Igor", "OLD_Core"},
- {3, "Alexey", "SQL"}
- });
- verifyJoin(left, right, RIGHT, new Object[][]{
- {2, "Igor", "Core"},
- {2, "Igor", "OLD_Core"},
- {3, "Alexey", "SQL"},
- {null, null, "Arch"}
- });
- verifyJoin(left, right, FULL, new Object[][]{
- {1, "Roman", null},
- {2, "Igor", "Core"},
- {2, "Igor", "OLD_Core"},
- {3, "Alexey", "SQL"},
- {null, null, "Arch"}
- });
- verifyJoin(left, right, SEMI, new Object[][]{
- {2, "Igor"},
- {3, "Alexey"}
- });
- verifyJoin(left, right, ANTI, new Object[][]{
- {1, "Roman"}
- });
- }
-
- @Test
- public void joinOneToMany2() {
- Object[][] left = {
- {1, "Roman", null},
- {2, "Igor", 1},
- {3, "Alexey", 2},
- {4, "Ivan", 4},
- {5, "Taras", 5},
- {6, "Lisa", 6}
- };
-
- Object[][] right = {
- {1, "Core"},
- {1, "OLD_Core"},
- {2, "SQL"},
- {3, "QA"},
- {5, "Arch"}
- };
-
- verifyJoin(left, right, INNER, new Object[][]{
- {2, "Igor", "Core"},
- {2, "Igor", "OLD_Core"},
- {3, "Alexey", "SQL"},
- {5, "Taras", "Arch"}
- });
- verifyJoin(left, right, LEFT, new Object[][]{
- {1, "Roman", null},
- {2, "Igor", "Core"},
- {2, "Igor", "OLD_Core"},
- {3, "Alexey", "SQL"},
- {4, "Ivan", null},
- {5, "Taras", "Arch"},
- {6, "Lisa", null}
- });
- verifyJoin(left, right, RIGHT, new Object[][]{
- {2, "Igor", "Core"},
- {2, "Igor", "OLD_Core"},
- {3, "Alexey", "SQL"},
- {5, "Taras", "Arch"},
- {null, null, "QA"}
- });
- verifyJoin(left, right, FULL, new Object[][]{
- {1, "Roman", null},
- {2, "Igor", "Core"},
- {2, "Igor", "OLD_Core"},
- {3, "Alexey", "SQL"},
- {4, "Ivan", null},
- {5, "Taras", "Arch"},
- {6, "Lisa", null},
- {null, null, "QA"}
- });
- verifyJoin(left, right, SEMI, new Object[][]{
- {2, "Igor"},
- {3, "Alexey"},
- {5, "Taras"}
- });
- verifyJoin(left, right, ANTI, new Object[][]{
- {1, "Roman"},
- {4, "Ivan"},
- {6, "Lisa"}
- });
- }
-
- @Test
- public void joinManyToMany() {
- Object[][] left = {
- {1, "Roman", null},
- {2, "Igor", 1},
- {3, "Taras", 1},
- {4, "Alexey", 2},
- {5, "Ivan", 4},
- {6, "Andrey", 4}
- };
-
- Object[][] right = {
- {1, "Core"},
- {1, "OLD_Core"},
- {2, "SQL"},
- {3, "Arch"},
- {4, "QA"},
- {4, "OLD_QA"},
- };
-
- verifyJoin(left, right, INNER, new Object[][]{
- {2, "Igor", "Core"},
- {2, "Igor", "OLD_Core"},
- {3, "Taras", "Core"},
- {3, "Taras", "OLD_Core"},
- {4, "Alexey", "SQL"},
- {5, "Ivan", "QA"},
- {5, "Ivan", "OLD_QA"},
- {6, "Andrey", "QA"},
- {6, "Andrey", "OLD_QA"}
- });
- verifyJoin(left, right, LEFT, new Object[][]{
- {1, "Roman", null},
- {2, "Igor", "Core"},
- {2, "Igor", "OLD_Core"},
- {3, "Taras", "Core"},
- {3, "Taras", "OLD_Core"},
- {4, "Alexey", "SQL"},
- {5, "Ivan", "QA"},
- {5, "Ivan", "OLD_QA"},
- {6, "Andrey", "QA"},
- {6, "Andrey", "OLD_QA"}
- });
- verifyJoin(left, right, RIGHT, new Object[][]{
- {2, "Igor", "Core"},
- {2, "Igor", "OLD_Core"},
- {3, "Taras", "Core"},
- {3, "Taras", "OLD_Core"},
- {4, "Alexey", "SQL"},
- {5, "Ivan", "QA"},
- {5, "Ivan", "OLD_QA"},
- {6, "Andrey", "QA"},
- {6, "Andrey", "OLD_QA"},
- {null, null, "Arch"}
- });
- verifyJoin(left, right, FULL, new Object[][]{
- {1, "Roman", null},
- {2, "Igor", "Core"},
- {2, "Igor", "OLD_Core"},
- {3, "Taras", "Core"},
- {3, "Taras", "OLD_Core"},
- {4, "Alexey", "SQL"},
- {5, "Ivan", "QA"},
- {5, "Ivan", "OLD_QA"},
- {6, "Andrey", "QA"},
- {6, "Andrey", "OLD_QA"},
- {null, null, "Arch"}
- });
- verifyJoin(left, right, SEMI, new Object[][]{
- {2, "Igor"},
- {3, "Taras"},
- {4, "Alexey"},
- {5, "Ivan"},
- {6, "Andrey"},
- });
- verifyJoin(left, right, ANTI, new Object[][]{
- {1, "Roman"}
- });
- }
-
- /**
- * Creates execution tree and executes it. Then compares the result of the execution with the given one.
- *
- * @param left Data for left table.
- * @param right Data for right table.
- * @param joinType Join type.
- * @param expRes Expected result.
- */
- private void verifyJoin(Object[][] left, Object[][] right, JoinRelType joinType, Object[][] expRes) {
- ExecutionContext<Object[]> ctx = executionContext(true);
-
- IgniteTypeFactory tf = ctx.getTypeFactory();
-
- RelDataType leftType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
- NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
- ScanNode<Object[]> leftNode = new ScanNode<>(ctx, Arrays.asList(left));
-
- RelDataType rightType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
- ScanNode<Object[]> rightNode = new ScanNode<>(ctx, Arrays.asList(right));
-
- RelDataType outType;
- if (setOf(SEMI, ANTI).contains(joinType)) {
- outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
- NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
- } else {
- outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
- NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32, NativeTypes.INT32, NativeTypes.STRING));
- }
-
- RowHandler<Object[]> hnd = ctx.rowHandler();
-
- NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, outType, leftType, rightType, joinType,
- (r1, r2) -> getFieldFromBiRows(hnd, 2, r1, r2) == getFieldFromBiRows(hnd, 3, r1, r2));
- join.register(asList(leftNode, rightNode));
-
- ProjectNode<Object[]> project;
- if (setOf(SEMI, ANTI).contains(joinType)) {
- project = new ProjectNode<>(ctx, r -> new Object[]{r[0], r[1]});
- } else {
- project = new ProjectNode<>(ctx, r -> new Object[]{r[0], r[1], r[4]});
- }
- project.register(join);
-
- // first, let's rewind just created tree -- it's how it actually being executed
- project.rewind();
-
- int times = 2;
- do {
- TestDownstream<Object[]> downstream = new TestDownstream<>();
- project.onRegister(downstream);
-
- ctx.execute(() -> project.request(1024), project::onError);
-
- assertArrayEquals(expRes, await(downstream.result()).toArray(EMPTY));
-
- // now let's rewind and restart test to check whether all state has been correctly reset
- project.rewind();
- } while (times-- > 0);
- }
-
- /**
- * Creates {@link Set set} from provided items.
- *
- * @param items Items.
- * @return New set.
- */
- @SafeVarargs
- private static <T> Set<T> setOf(T... items) {
- return new HashSet<>(Arrays.asList(items));
- }
-
+/** Nested loop join execution tests. */
+public class NestedLoopJoinExecutionTest extends AbstractJoinExecutionTest {
@Override
- protected RowHandler<Object[]> rowHandler() {
- return ArrayRowHandler.INSTANCE;
+ JoinAlgo joinAlgo() {
+ return JoinAlgo.NESTED_LOOP;
}
}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
index ded0e04..5a53898 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
@@ -270,7 +270,8 @@
*/
@Test
public void noSortAppendingWithCorrectCollation() throws Exception {
- String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule"};
+ String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule",
+ "HashJoinConverter"};
assertPlan(TestCase.CASE_16,
not(nodeOrAnyChild(isInstanceOf(IgniteSort.class)))
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
index fec1474..2d7e800 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
@@ -251,7 +251,8 @@
*/
@Test
public void noSortAppendingWithCorrectCollation() throws Exception {
- String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule"};
+ String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule",
+ "HashJoinConverter"};
assertPlan(TestCase.CASE_16,
nodeOrAnyChild(isInstanceOf(IgniteSort.class)
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
index 8953888..eb6c589 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
@@ -258,7 +258,8 @@
*/
@Test
public void noSortAppendingWithCorrectCollation() throws Exception {
- String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule"};
+ String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule",
+ "HashJoinConverter"};
assertPlan(TestCase.CASE_16,
not(nodeOrAnyChild(isInstanceOf(IgniteSort.class)))
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
index 5f5474a..bfd8e97 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
@@ -61,7 +61,7 @@
IgniteRel phys = physicalPlan(
sql,
publicSchema,
- "MergeJoinConverter", "NestedLoopJoinConverter"
+ "MergeJoinConverter", "NestedLoopJoinConverter", "HashJoinConverter"
);
System.out.println("+++ " + RelOptUtil.toString(phys));
@@ -98,7 +98,7 @@
sql,
publicSchema,
Objects::nonNull,
- "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule"
+ "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule", "HashJoinConverter"
);
}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexPlannerTest.java
index 59dea6c..a9820d6 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexPlannerTest.java
@@ -116,7 +116,7 @@
String sql = "SELECT l.*, r.* FROM left_tbl l JOIN right_tbl r ON l.val0 = r.val0 AND l.val1 = r.val1";
- RelNode phys = physicalPlan(sql, schema, "MergeJoinConverter", "NestedLoopJoinConverter");
+ RelNode phys = physicalPlan(sql, schema, "MergeJoinConverter", "NestedLoopJoinConverter", "HashJoinConverter");
IgniteIndexScan scan = findFirstNode(phys, byClass(IgniteIndexScan.class));
@@ -136,7 +136,7 @@
String sql = "SELECT l.id FROM left_tbl l JOIN right_tbl r ON l.val0 = r.val0";
- RelNode phys = physicalPlan(sql, schema, "MergeJoinConverter", "NestedLoopJoinConverter");
+ RelNode phys = physicalPlan(sql, schema, "MergeJoinConverter", "NestedLoopJoinConverter", "HashJoinConverter");
IgniteIndexScan scan = findFirstNode(phys, byClass(IgniteIndexScan.class));
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java
index e49c9a5..621977e 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java
@@ -58,7 +58,7 @@
IgniteRel phys = physicalPlan(
sql,
publicSchema,
- "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToSortedIndexSpoolRule"
+ "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToSortedIndexSpoolRule", "HashJoinConverter"
);
System.out.println("+++\n" + RelOptUtil.toString(phys));
@@ -89,7 +89,7 @@
IgniteRel phys = physicalPlan(
sql,
publicSchema,
- "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToSortedIndexSpoolRule"
+ "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToSortedIndexSpoolRule", "HashJoinConverter"
);
IgniteHashIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteHashIndexSpool.class));
@@ -122,7 +122,7 @@
IgniteRel phys = physicalPlan(
sql,
publicSchema,
- "MergeJoinConverter", "NestedLoopJoinConverter"
+ "MergeJoinConverter", "NestedLoopJoinConverter", "HashJoinConverter"
);
IgniteHashIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteHashIndexSpool.class));
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashJoinPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashJoinPlannerTest.java
new file mode 100644
index 0000000..e602e9c
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashJoinPlannerTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.planner;
+
+import static org.apache.ignite.internal.sql.engine.planner.CorrelatedSubqueryPlannerTest.createTestTable;
+import static org.apache.ignite.internal.sql.engine.planner.JoinColocationPlannerTest.complexTbl;
+import static org.apache.ignite.internal.sql.engine.planner.JoinColocationPlannerTest.simpleTable;
+import static org.apache.ignite.internal.sql.engine.planner.JoinColocationPlannerTest.simpleTableHashPk;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.List;
+import java.util.stream.Stream;
+import org.apache.calcite.plan.RelOptPlanner.CannotPlanException;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.ignite.internal.sql.engine.rel.IgniteHashJoin;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/** HashJoin planner test. */
+public class HashJoinPlannerTest extends AbstractPlannerTest {
+ private static final String[] disabledRules = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "MergeJoinConverter"};
+
+ private static final String[] joinTypes = {"LEFT", "RIGHT", "INNER", "FULL OUTER"};
+
+ /**
+ * Hash join need to preserve left collation.
+ */
+ @Test
+ public void hashJoinCheckLeftCollationsPropagation() throws Exception {
+ IgniteTable tbl1 = simpleTable("TEST_TBL", DEFAULT_TBL_SIZE);
+ IgniteTable tbl2 = complexTbl("TEST_TBL_CMPLX");
+
+ IgniteSchema schema = createSchema(tbl1, tbl2);
+
+ String sql = "select t1.ID, t2.ID1 "
+ + "from TEST_TBL_CMPLX t2 "
+ + "join TEST_TBL t1 on t1.id = t2.id1 "
+ + "order by t2.ID1 NULLS LAST, t2.ID2 NULLS LAST";
+
+ // Only hash join
+ RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter",
+ "CorrelatedNestedLoopJoin", "MergeJoinConverter", "JoinCommuteRule");
+
+ IgniteHashJoin join = findFirstNode(phys, byClass(IgniteHashJoin.class));
+ List<RelNode> joinNodes = findNodes(phys, byClass(IgniteHashJoin.class));
+ List<RelNode> sortNodes = findNodes(phys, byClass(IgniteSort.class));
+
+ String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+ assertThat(invalidPlanMsg, sortNodes.size(), equalTo(0));
+ assertThat(invalidPlanMsg, joinNodes.size(), equalTo(1));
+ assertThat(invalidPlanMsg, join, notNullValue());
+ }
+
+ /**
+ * Hash join erase right collation.
+ */
+ @Test
+ public void hashJoinCheckRightCollations() throws Exception {
+ IgniteTable tbl1 = simpleTable("TEST_TBL", DEFAULT_TBL_SIZE);
+ IgniteTable tbl2 = complexTbl("TEST_TBL_CMPLX");
+
+ IgniteSchema schema = createSchema(tbl1, tbl2);
+
+ String sql = "select t1.ID, t2.ID1 "
+ + "from TEST_TBL t1 "
+ + "join TEST_TBL_CMPLX t2 on t1.id = t2.id1 "
+ + "order by t2.ID1 NULLS LAST, t2.ID2 NULLS LAST";
+
+ // Only hash join
+ IgniteRel phys = physicalPlan(sql, schema, "NestedLoopJoinConverter",
+ "CorrelatedNestedLoopJoin", "MergeJoinConverter", "JoinCommuteRule");
+
+ IgniteHashJoin join = findFirstNode(phys, byClass(IgniteHashJoin.class));
+
+ String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+ assertThat(invalidPlanMsg, join, notNullValue());
+ assertThat(invalidPlanMsg, sortOnTopOfJoin(phys), notNullValue());
+ }
+
+ @Test
+ public void hashJoinWinsOnLeftSkewedInput() throws Exception {
+ IgniteTable thinTblSortedPk = simpleTable("THIN_TBL", 10);
+ IgniteTable thinTblHashPk = simpleTableHashPk("THIN_TBL_HASH_PK", 10);
+ IgniteTable thickTblSortedPk = simpleTable("THICK_TBL", 100_000);
+ IgniteTable thickTblHashPk = simpleTableHashPk("THICK_TBL_HASH_PK", 100_000);
+
+ IgniteSchema schema = createSchema(thinTblSortedPk, thickTblSortedPk, thinTblHashPk, thickTblHashPk);
+
+ String sql = "select t1.ID, t1.ID2, t2.ID, t2.ID2 "
+ + "from THICK_TBL t1 " // left
+ + "join THIN_TBL t2 on t1.ID2 = t2.ID2 "; // right
+
+ assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class)), "JoinCommuteRule");
+
+ sql = "select t1.ID, t1.ID2, t2.ID, t2.ID2 "
+ + "from THIN_TBL t1 " // left
+ + "join THICK_TBL t2 on t1.ID2 = t2.ID2 "; // right
+
+ assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class).negate()), "JoinCommuteRule");
+
+ sql = "select t1.ID, t1.ID2, t2.ID, t2.ID2 "
+ + "from THIN_TBL_HASH_PK t1 " // left
+ + "join THICK_TBL_HASH_PK t2 on t1.ID = t2.ID "; // right
+
+ assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class)), "JoinCommuteRule");
+
+ // merge join can consume less cpu in such a case
+ sql = "select t1.ID, t1.ID2, t2.ID, t2.ID2 "
+ + "from THIN_TBL t1 " // left
+ + "join THICK_TBL t2 on t1.ID = t2.ID "; // right
+
+ assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class).negate()), "JoinCommuteRule");
+ }
+
+ private static @Nullable IgniteSort sortOnTopOfJoin(IgniteRel root) {
+ List<IgniteSort> sortNodes = findNodes(root, byClass(IgniteSort.class)
+ .and(node -> node.getInputs().size() == 1 && node.getInput(0) instanceof Join));
+
+ if (sortNodes.size() > 1) {
+ throw new AssertionError("Unexpected count of sort nodes: exp<=1, act=" + sortNodes.size());
+ }
+
+ return sortNodes.isEmpty() ? null : sortNodes.get(0);
+ }
+
+ /** Check that only appropriate conditions are acceptable for hash join. */
+ @ParameterizedTest()
+ @MethodSource("joinConditions")
+ @SuppressWarnings("ThrowableNotThrown")
+ public void hashJoinAppliedConditions(String sql, boolean canBePlanned) throws Exception {
+ IgniteTable tbl = createTestTable("ID", "C1");
+
+ IgniteSchema schema = createSchema(tbl);
+
+ for (String type : joinTypes) {
+ String sql0 = String.format(sql, type);
+
+ if (canBePlanned) {
+ assertPlan(sql0, schema, nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class)), disabledRules);
+ } else {
+ IgniteTestUtils.assertThrowsWithCause(() -> physicalPlan(sql0, schema, disabledRules),
+ CannotPlanException.class,
+ "There are not enough rules");
+ }
+ }
+ }
+
+ private static Stream<Arguments> joinConditions() {
+ return Stream.of(
+ Arguments.of("select t1.c1 from t1 %s join t1 t2 on t1.c1 = t2.c1", true),
+ Arguments.of("select t1.c1 from t1 %s join t1 t2 using(c1)", true),
+ Arguments.of("select t1.c1 from t1 %s join t1 t2 on t1.c1 = 1", false),
+ Arguments.of("select t1.c1 from t1 %s join t1 t2 ON t1.id is not distinct from t2.c1", false),
+ Arguments.of("select t1.c1 from t1 %s join t1 t2 on t1.c1 = ?", false),
+ Arguments.of("select t1.c1 from t1 %s join t1 t2 on t1.c1 = OCTET_LENGTH('TEST')", false),
+ Arguments.of("select t1.c1 from t1 %s join t1 t2 on t1.c1 = LOG10(t1.c1)", false),
+ Arguments.of("select t1.c1 from t1 %s join t1 t2 on t1.c1 = t2.c1 and t1.ID > t2.ID", false),
+ Arguments.of("select t1.c1 from t1 %s join t1 t2 on t1.c1 = 1 and t2.c1 = 1", false)
+ );
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java
index cc77935..a1c2850 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java
@@ -160,7 +160,7 @@
assertPlan(query, igniteSchema, nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
.and(nodeOrAnyChild(new TableScanWithProjection(expected.lhs)))
.and(nodeOrAnyChild(new TableScanWithProjection(expected.rhs)))
- ));
+ ), "HashJoinConverter", "NestedLoopJoinConverter");
}
/** Nested loop join - casts are added to condition operands. **/
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
index 6d18730..06061a3 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
@@ -18,15 +18,18 @@
package org.apache.ignite.internal.sql.engine.planner;
import static org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+import java.util.List;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.rel.AbstractIgniteJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
@@ -39,6 +42,8 @@
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.type.NativeTypes;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
/**
* Test suite to verify join colocation.
@@ -48,7 +53,7 @@
* Join of the same tables with a simple affinity is expected to be colocated.
*/
@Test
- public void joinSameTableSimpleAff() throws Exception {
+ public void joinSameTableSimpleAffMergeJoin() throws Exception {
IgniteTable tbl = simpleTable("TEST_TBL", DEFAULT_TBL_SIZE);
IgniteSchema schema = createSchema(tbl);
@@ -57,7 +62,7 @@
+ "from TEST_TBL t1 "
+ "join TEST_TBL t2 on t1.id = t2.id";
- RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+ RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "HashJoinConverter");
IgniteMergeJoin join = findFirstNode(phys, byClass(IgniteMergeJoin.class));
@@ -70,10 +75,37 @@
}
/**
- * Join of the same tables with a complex affinity is expected to be colocated.
+ * Join of the same tables with a simple affinity is expected to be colocated.
*/
@Test
- public void joinSameTableComplexAff() throws Exception {
+ public void joinSameTableSimpleAffHashJoin() throws Exception {
+ IgniteTable tbl = simpleTable("TEST_TBL", DEFAULT_TBL_SIZE);
+
+ IgniteSchema schema = createSchema(tbl);
+
+ String sql = "select count(*) "
+ + "from TEST_TBL t1 "
+ + "join TEST_TBL t2 on t1.id = t2.id";
+
+ // Only hash join
+ RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "MergeJoinConverter");
+
+ AbstractIgniteJoin join = findFirstNode(phys, byClass(AbstractIgniteJoin.class));
+ List<RelNode> joinNodes = findNodes(phys, byClass(AbstractIgniteJoin.class));
+
+ String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+ assertThat(invalidPlanMsg, joinNodes.size(), equalTo(1));
+ assertThat(invalidPlanMsg, join, notNullValue());
+ assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
+ }
+
+ /**
+ * Join of the same tables with a complex affinity is expected to be colocated.
+ */
+ @ParameterizedTest(name = "DISABLED: {0}")
+ @ValueSource(strings = {"HashJoinConverter", "MergeJoinConverter"})
+ public void joinSameTableComplexAff(String disabledRule) throws Exception {
IgniteTable tbl = complexTbl("TEST_TBL");
IgniteSchema schema = createSchema(tbl);
@@ -82,16 +114,74 @@
+ "from TEST_TBL t1 "
+ "join TEST_TBL t2 on t1.id1 = t2.id1 and t1.id2 = t2.id2";
- RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+ RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", disabledRule);
- IgniteMergeJoin join = findFirstNode(phys, byClass(IgniteMergeJoin.class));
+ AbstractIgniteJoin join = findFirstNode(phys, byClass(AbstractIgniteJoin.class));
String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
assertThat(invalidPlanMsg, join, notNullValue());
assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
- assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class));
- assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class));
+
+ if (!"MergeJoinConverter".equals(disabledRule)) {
+ assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class));
+ assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class));
+ }
+ }
+
+ /**
+ * Join of the same tables with a complex affinity is expected to be colocated.
+ */
+ @ParameterizedTest(name = "DISABLED: {0}")
+ @ValueSource(strings = {"HashJoinConverter", "MergeJoinConverter"})
+ public void joinDiffTablesComplexAff(String disabledRule) throws Exception {
+ IgniteTable tbl1 = complexTbl("TEST_TBL1");
+ IgniteTable tbl2 = complexTbl("TEST_TBL2");
+
+ IgniteSchema schema = createSchema(tbl1, tbl2);
+
+ String sql = "select count(*) "
+ + "from TEST_TBL1 t1 "
+ + "join TEST_TBL2 t2 on t1.id1 = t2.id1 and t1.id2 = t2.id2";
+
+ RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", disabledRule);
+
+ AbstractIgniteJoin join = findFirstNode(phys, byClass(AbstractIgniteJoin.class));
+
+ String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+ assertThat(invalidPlanMsg, join, notNullValue());
+ assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
+
+ if (!"MergeJoinConverter".equals(disabledRule)) {
+ assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class));
+ assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class));
+ }
+ }
+
+ /**
+ * Join of the same tables with a complex affinity is expected to be colocated.
+ */
+ @ParameterizedTest(name = "DISABLED: {0}")
+ @ValueSource(strings = {"HashJoinConverter", "MergeJoinConverter"})
+ public void joinDiffTablesNotSupersetComplexAff(String disabledRule) throws Exception {
+ IgniteTable tbl1 = complexTbl("TEST_TBL1");
+ IgniteTable tbl2 = complexTbl("TEST_TBL2");
+
+ IgniteSchema schema = createSchema(tbl1, tbl2);
+
+ String sql = "select count(*) "
+ + "from TEST_TBL1 t1 "
+ + "join TEST_TBL2 t2 on t1.id1 = t2.id1";
+
+ RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", disabledRule);
+
+ AbstractIgniteJoin join = findFirstNode(phys, byClass(AbstractIgniteJoin.class));
+
+ String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+ assertThat(invalidPlanMsg, join, notNullValue());
+ assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(false));
}
/**
@@ -124,7 +214,7 @@
))
))
))
- ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+ ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "HashJoinConverter");
}
/**
@@ -162,10 +252,10 @@
.and(scan -> complexTblIndirect.equals(scan.getTable().unwrap(IgniteTable.class)))
))
))
- ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+ ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "HashJoinConverter");
}
- private static IgniteTable simpleTable(String tableName, int size) {
+ static IgniteTable simpleTable(String tableName, int size) {
return TestBuilders.table()
.name(tableName)
.size(size)
@@ -180,7 +270,22 @@
.build();
}
- private static IgniteTable complexTbl(String tableName) {
+ static IgniteTable simpleTableHashPk(String tableName, int size) {
+ return TestBuilders.table()
+ .name(tableName)
+ .size(size)
+ .distribution(someAffinity())
+ .addColumn("ID", NativeTypes.INT32)
+ .addColumn("ID2", NativeTypes.INT32)
+ .addColumn("VAL", NativeTypes.STRING)
+ .hashIndex()
+ .name("PK")
+ .addColumn("ID")
+ .end()
+ .build();
+ }
+
+ static IgniteTable complexTbl(String tableName) {
return complexTbl(tableName, DEFAULT_TBL_SIZE,
IgniteDistributions.affinity(ImmutableIntList.of(0, 1), nextTableId(), DEFAULT_ZONE_ID));
}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinCommutePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinCommutePlannerTest.java
index 1e21564..d394d46 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinCommutePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinCommutePlannerTest.java
@@ -115,7 +115,7 @@
// Use aggregates that are the same for both MAP and REDUCE phases.
String sql = "SELECT SUM(s.id), SUM(h.id) FROM SMALL s RIGHT JOIN HUGE h on h.id = s.id";
- IgniteRel phys = physicalPlan(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin");
+ IgniteRel phys = physicalPlan(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin");
assertNotNull(phys);
@@ -127,7 +127,7 @@
assertEquals(JoinRelType.LEFT, join.getJoinType());
- PlanningContext ctx = plannerCtx(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin");
+ PlanningContext ctx = plannerCtx(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin");
RelOptPlanner pl = ctx.cluster().getPlanner();
@@ -139,7 +139,7 @@
assertNotNull(phys);
- phys = physicalPlan(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
+ phys = physicalPlan(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
join = findFirstNode(phys, byClass(IgniteNestedLoopJoin.class));
@@ -150,7 +150,7 @@
// no commute
assertEquals(JoinRelType.RIGHT, join.getJoinType());
- ctx = plannerCtx(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
+ ctx = plannerCtx(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
pl = ctx.cluster().getPlanner();
@@ -166,7 +166,7 @@
// Use aggregates that are the same for both MAP and REDUCE phases.
String sql = "SELECT SUM(s.id), SUM(h.id) FROM SMALL s JOIN HUGE h on h.id = s.id";
- IgniteRel phys = physicalPlan(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin");
+ IgniteRel phys = physicalPlan(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin");
assertNotNull(phys);
@@ -194,7 +194,7 @@
assertEquals(JoinRelType.INNER, join.getJoinType());
- PlanningContext ctx = plannerCtx(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin");
+ PlanningContext ctx = plannerCtx(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin");
RelOptPlanner pl = ctx.cluster().getPlanner();
@@ -205,7 +205,7 @@
assertNotNull(phys);
- phys = physicalPlan(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
+ phys = physicalPlan(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
join = findFirstNode(phys, byClass(IgniteNestedLoopJoin.class));
proj = findFirstNode(phys, byClass(IgniteProject.class));
@@ -231,7 +231,7 @@
// no commute
assertEquals(JoinRelType.INNER, join.getJoinType());
- ctx = plannerCtx(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
+ ctx = plannerCtx(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
pl = ctx.cluster().getPlanner();
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
index 6447185..0a4c52d 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
@@ -258,7 +258,8 @@
*/
@Test
public void noSortAppendingWithCorrectCollation() throws Exception {
- String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule"};
+ String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule",
+ "HashJoinConverter"};
assertPlan(TestCase.CASE_16,
nodeOrAnyChild(isInstanceOf(IgniteSort.class)
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
index 14d8da0..0ca318b 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
@@ -260,7 +260,8 @@
*/
@Test
public void noSortAppendingWithCorrectCollation() throws Exception {
- String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule"};
+ String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule",
+ "HashJoinConverter"};
assertPlan(TestCase.CASE_16,
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MergeJoinPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MergeJoinPlannerTest.java
index a0f95d8..0029c7a 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MergeJoinPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MergeJoinPlannerTest.java
@@ -47,7 +47,8 @@
"NestedLoopJoinConverter",
"CorrelatedNestedLoopJoin",
"FilterSpoolMergeRule",
- "JoinCommuteRule"
+ "JoinCommuteRule",
+ "HashJoinConverter"
};
/**
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
index 6dfd738..6fd93e0 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
@@ -63,7 +63,7 @@
IgniteRel phys = physicalPlan(
sql,
publicSchema,
- "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule"
+ "HashJoinConverter", "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule"
);
IgniteSortedIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteSortedIndexSpool.class));
@@ -95,7 +95,7 @@
IgniteRel phys = physicalPlan(
sql,
publicSchema,
- "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule"
+ "HashJoinConverter", "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule"
);
System.out.println("+++ \n" + RelOptUtil.toString(phys));
@@ -150,7 +150,7 @@
})
.and(hasChildThat(isIndexScan("T1", "idx_jid")))
)),
- "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule"
+ "HashJoinConverter", "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule"
);
}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java
index 6cfd44d..547f8ec 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java
@@ -52,7 +52,7 @@
+ "join t1 on t0.jid > t1.jid";
IgniteRel phys = physicalPlan(sql, publicSchema,
- "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule");
+ "HashJoinConverter", "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule");
assertNotNull(phys);
diff --git a/modules/sql-engine/src/test/resources/mapping/correlated.test b/modules/sql-engine/src/test/resources/mapping/correlated.test
index 779cd1a..9dd930b 100644
--- a/modules/sql-engine/src/test/resources/mapping/correlated.test
+++ b/modules/sql-engine/src/test/resources/mapping/correlated.test
@@ -129,11 +129,10 @@
exchangeSourceNodes: {1=[N1]}
tree:
Project
- MergeJoin
+ HashJoin
Receiver(sourceFragment=1, exchange=1, distribution=single)
- Sort
- Filter
- TableFunctionScan(source=2, distribution=single)
+ Filter
+ TableFunctionScan(source=2, distribution=single)
Fragment#1
targetNodes: [N0]
@@ -142,8 +141,7 @@
partitions: {N1=[0:1]}
tree:
Sender(targetFragment=0, exchange=1, distribution=single)
- Sort
- TableScan(name=PUBLIC.T_N1, source=3, partitions=1, distribution=random)
+ TableScan(name=PUBLIC.T_N1, source=3, partitions=1, distribution=random)
---
N0
diff --git a/modules/sql-engine/src/test/resources/mapping/hash_join.test b/modules/sql-engine/src/test/resources/mapping/hash_join.test
new file mode 100644
index 0000000..7c4ae58
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/hash_join.test
@@ -0,0 +1,102 @@
+N0
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('JoinCommuteRule', 'MergeJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN t2_n1 USING (id)
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N1]}
+ tree:
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N1]
+ tables: [T1_N1, T2_N1]
+ partitions: {N1=[0:1]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ Project
+ HashJoin
+ TableScan(name=PUBLIC.T1_N1, source=2, partitions=1, distribution=affinity[table: T1_N1, columns: [ID]])
+ TableScan(name=PUBLIC.T2_N1, source=3, partitions=1, distribution=affinity[table: T2_N1, columns: [ID]])
+---
+
+N0
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('JoinCommuteRule', 'MergeJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 t1 JOIN t1_n1 t2 ON t1.id = t2.id
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N1]}
+ tree:
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N1]
+ tables: [T1_N1, T1_N1]
+ partitions: {N1=[0:1]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ HashJoin
+ TableScan(name=PUBLIC.T1_N1, source=2, partitions=1, distribution=affinity[table: T1_N1, columns: [ID]])
+ TableScan(name=PUBLIC.T1_N1, source=3, partitions=1, distribution=affinity[table: T1_N1, columns: [ID]])
+---
+
+N0
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('JoinCommuteRule', 'MergeJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN t2_n2 USING (id)
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N1]}
+ tree:
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#4
+ targetNodes: [N0]
+ executionNodes: [N1]
+ remoteFragments: [5]
+ exchangeSourceNodes: {5=[N2]}
+ tables: [T1_N1, T2_N2]
+ partitions: {N1=[0:1]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ Project
+ HashJoin
+ TableScan(name=PUBLIC.T1_N1, source=2, partitions=1, distribution=affinity[table: T1_N1, columns: [ID]])
+ Receiver(sourceFragment=5, exchange=5, distribution=affinity[table: T2_N2, columns: [ID]])
+
+Fragment#5
+ targetNodes: [N1]
+ executionNodes: [N2]
+ tables: [T2_N2]
+ partitions: {N2=[0:1]}
+ tree:
+ Sender(targetFragment=4, exchange=5, distribution=affinity[table: T2_N2, columns: [ID]])
+ TableScan(name=PUBLIC.T2_N2, source=3, partitions=1, distribution=affinity[table: T2_N2, columns: [ID]])
+---
+
+N0
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('JoinCommuteRule', 'MergeJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1
+JOIN TABLE(SYSTEM_RANGE(0, 10)) r ON t1_n1.id = r.x
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N1]}
+ tree:
+ Project
+ HashJoin
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+ TableFunctionScan(source=2, distribution=single)
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N1]
+ tables: [T1_N1]
+ partitions: {N1=[0:1]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T1_N1, source=3, partitions=1, distribution=affinity[table: T1_N1, columns: [ID]])
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/merge_join.test b/modules/sql-engine/src/test/resources/mapping/merge_join.test
index 5df9ab5..c1d88b6 100644
--- a/modules/sql-engine/src/test/resources/mapping/merge_join.test
+++ b/modules/sql-engine/src/test/resources/mapping/merge_join.test
@@ -1,5 +1,5 @@
N0
-SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('NestedLoopJoinConverter') */ * FROM t1_n1 JOIN t2_n1 USING (id)
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('HashJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN t2_n1 USING (id)
---
Fragment#0 root
executionNodes: [N0]
@@ -24,7 +24,7 @@
---
N1
-SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('NestedLoopJoinConverter') */ * FROM t1_n1 JOIN t2_n1 USING (id)
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('HashJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN t2_n1 USING (id)
---
Fragment#0 root
executionNodes: [N1]
@@ -49,7 +49,7 @@
---
N0
-SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('NestedLoopJoinConverter') */ * FROM t1_n1 JOIN t2_n2 USING (id)
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('HashJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN t2_n2 USING (id)
---
Fragment#0 root
executionNodes: [N0]
diff --git a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
index 8fc500b..26ee11a 100644
--- a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
+++ b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
@@ -20,7 +20,7 @@
---
# Partition pruning of joined tables (relies on predicate push down)
N1
-SELECT * FROM t1_n1n2n3 as t1, t2_n4n5 as t2 WHERE t1.c1 = t2.c1 and t1.id = 1 and t2.id = 42
+SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter', 'HashJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1n2n3 as t1, t2_n4n5 as t2 WHERE t1.c1 = t2.c1 and t1.id = 1 and t2.id = 42
---
Fragment#0 root
executionNodes: [N1]
@@ -53,7 +53,7 @@
---
# Self join, different predicates that produce same set of partitions
N1
-SELECT * FROM t1_n1n2n3 as t1, t1_n1n2n3 as t2 WHERE t1.c1 = t2.c1 and t1.id = 1 and t2.id = 17
+SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter', 'HashJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1n2n3 as t1, t1_n1n2n3 as t2 WHERE t1.c1 = t2.c1 and t1.id = 1 and t2.id = 17
---
Fragment#0 root
executionNodes: [N1]
@@ -86,7 +86,7 @@
---
# Self join, different predicates that produce disjoint set of partitions
N1
-SELECT * FROM t1_n1n2n3 as t1, t1_n1n2n3 as t2 WHERE t1.id = t2.id and t1.id = 1 and t2.id = 42
+SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter', 'HashJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1n2n3 as t1, t1_n1n2n3 as t2 WHERE t1.id = t2.id and t1.id = 1 and t2.id = 42
---
Fragment#0 root
executionNodes: [N1]
diff --git a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/BaseSqlIntegrationTest.java b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/BaseSqlIntegrationTest.java
index 4ab357b..f70a565 100644
--- a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/BaseSqlIntegrationTest.java
+++ b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/BaseSqlIntegrationTest.java
@@ -111,19 +111,29 @@
NESTED_LOOP(
"CorrelatedNestedLoopJoin",
"JoinCommuteRule",
- "MergeJoinConverter"
+ "MergeJoinConverter",
+ "HashJoinConverter"
),
MERGE(
"CorrelatedNestedLoopJoin",
"JoinCommuteRule",
- "NestedLoopJoinConverter"
+ "NestedLoopJoinConverter",
+ "HashJoinConverter"
),
CORRELATED(
"MergeJoinConverter",
"JoinCommuteRule",
- "NestedLoopJoinConverter"
+ "NestedLoopJoinConverter",
+ "HashJoinConverter"
+ ),
+
+ HASHJOIN(
+ "MergeJoinConverter",
+ "JoinCommuteRule",
+ "NestedLoopJoinConverter",
+ "CorrelatedNestedLoopJoin"
);
private final String[] disabledRules;