IGNITE-21720 Sql. Implement hash join (#3608)

diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlJoinBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlJoinBenchmark.java
new file mode 100644
index 0000000..3cc3ee2
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlJoinBenchmark.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.sql.engine.util.Commons.IN_BUFFER_SIZE;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.sql.IgniteSql;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark that runs join sql queries via embedded client on clusters of different size.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(1)
+@Warmup(iterations = 10, time = 2)
+@Measurement(iterations = 20, time = 2)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@SuppressWarnings({"WeakerAccess", "unused"})
+public class SqlJoinBenchmark extends AbstractMultiNodeBenchmark {
+    private static final int TABLE_SIZE = 2 * IN_BUFFER_SIZE + 1;
+
+    private IgniteSql sql;
+
+    @Param({"1", "2"})
+    private int clusterSize;
+
+    /** Fills the table with data. */
+    @Setup
+    public void setUp() throws IOException {
+        populateTable(TABLE_NAME, TABLE_SIZE, 1_000);
+
+        sql = clusterNode.sql();
+    }
+
+    /**
+     * Benchmark left hash join.
+     */
+    @Benchmark
+    @OutputTimeUnit(TimeUnit.MICROSECONDS)
+    public void leftHashJoin(Blackhole bh) {
+        try (var rs = sql.execute(null, ""
+                + "SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter', 'MergeJoinConverter', 'CorrelatedNestedLoopJoin') */ t1.field1 "
+                + "FROM usertable t1 "
+                + "LEFT JOIN usertable t2 "
+                + "on t1.field2 = t2.field2")) {
+            while (rs.hasNext()) {
+                bh.consume(rs.next());
+            }
+        }
+    }
+
+    /**
+     * Benchmark left merge join.
+     */
+    @Benchmark
+    @OutputTimeUnit(TimeUnit.MICROSECONDS)
+    public void leftMergeJoin(Blackhole bh) {
+        try (var rs = sql.execute(null, ""
+                + "SELECT /*+ DISABLE_RULE('HashJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ t1.field1 "
+                + "FROM usertable t1 "
+                + "LEFT JOIN usertable t2 "
+                + "on t1.field2 = t2.field2")) {
+            while (rs.hasNext()) {
+                bh.consume(rs.next());
+            }
+        }
+    }
+
+    /**
+     * Benchmark left nl join.
+     */
+    @Benchmark
+    @OutputTimeUnit(TimeUnit.MICROSECONDS)
+    public void leftNestedJoin(Blackhole bh) {
+        try (var rs = sql.execute(null, ""
+                + "SELECT /*+ DISABLE_RULE('HashJoinConverter', 'MergeJoinConverter', 'CorrelatedNestedLoopJoin') */ t1.field1 "
+                + "FROM usertable t1 "
+                + "LEFT JOIN usertable t2 "
+                + "on t1.field2 = t2.field2")) {
+            while (rs.hasNext()) {
+                bh.consume(rs.next());
+            }
+        }
+    }
+
+    /**
+     * Benchmark's entry point.
+     */
+    public static void main(String[] args) throws RunnerException {
+        Options opt = new OptionsBuilder()
+                .include(".*" + SqlJoinBenchmark.class.getSimpleName() + ".*Join")
+                .build();
+
+        new Runner(opt).run();
+    }
+
+    @Override
+    protected int nodes() {
+        return clusterSize;
+    }
+}
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
index 86cbb06..4d5bf44 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
@@ -324,6 +324,7 @@
                 + "AS agg ON t2_colo_va1.val1 = agg.val1";
 
         assertQuery(sql)
+                .disableRules("HashJoinConverter", "MergeJoinConverter")
                 .matches(QueryChecker.matches(".*Exchange.*Join.*Colocated.*Aggregate.*"))
                 .returns("val0", 50L)
                 .returns("val1", 50L)
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
index 99a804b..ac504d1 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
@@ -25,7 +25,8 @@
 
 /** Tests for correlated queries. */
 public class ItCorrelatesTest extends BaseSqlIntegrationTest {
-    private static final String DISABLED_JOIN_RULES = " /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter') */ ";
+    private static final String DISABLED_JOIN_RULES = " /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter', "
+            + "'HashJoinConverter') */ ";
 
     @AfterEach
     public void dropTables() {
@@ -49,7 +50,7 @@
      * Tests resolving of collisions in correlates with correlate variables in the left hand.
      */
     @Test
-    public void testCorrelatesCollisionLeft() throws InterruptedException {
+    public void testCorrelatesCollisionLeft() {
         sql("CREATE TABLE test1 (a INTEGER PRIMARY KEY, b INTEGER)");
         sql("CREATE TABLE test2 (a INTEGER PRIMARY KEY, c INTEGER)");
 
@@ -68,7 +69,7 @@
      * Tests resolving of collisions in correlates with correlate variables in both, left and right hands.
      */
     @Test
-    public void testCorrelatesCollisionRight() throws InterruptedException {
+    public void testCorrelatesCollisionRight() {
         sql("CREATE TABLE test1 (a INTEGER PRIMARY KEY, b INTEGER)");
         sql("CREATE TABLE test2 (a INTEGER PRIMARY KEY, c INTEGER)");
 
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItHashSpoolIntegrationTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItHashSpoolIntegrationTest.java
index 939845a..c7ddb8e 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItHashSpoolIntegrationTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItHashSpoolIntegrationTest.java
@@ -57,7 +57,8 @@
                 + "FROM t0 JOIN t1 ON t0.i1=t1.i1 AND t0.i2=t1.i2";
 
         assertQuery(sql)
-                .disableRules("MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToSortedIndexSpoolRule")
+                .disableRules("HashJoinConverter", "MergeJoinConverter", "NestedLoopJoinConverter",
+                        "FilterSpoolMergeToSortedIndexSpoolRule")
                 .returns(1, 1, 1, 1)
                 .check();
     }
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
index 2ca44e8..7c8a206 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
@@ -80,7 +80,7 @@
     public void test(int rows, int partitions) throws InterruptedException {
         prepareDataSet(rows, partitions);
 
-        var res = sql("SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter', 'MergeJoinConverter') */"
+        var res = sql("SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter', 'MergeJoinConverter', 'HashJoinConverter') */"
                         + "T0.val, T1.val FROM TEST0 as T0 "
                         + "JOIN TEST1 as T1 on T0.jid = T1.jid "
         );
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java
index 600f60f..191ef5b 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java
@@ -38,6 +38,9 @@
     public static void beforeTestsStarted() {
         sql("CREATE TABLE t1 (id INT PRIMARY KEY, c1 INT NOT NULL, c2 INT, c3 INT)");
         sql("CREATE TABLE t2 (id INT PRIMARY KEY, c1 INT NOT NULL, c2 INT, c3 INT)");
+        sql("CREATE TABLE t3 (id INT PRIMARY KEY, c1 INTEGER)");
+        sql("CREATE TABLE checkNulls1 (id INT PRIMARY KEY, c1 INTEGER, c2 INTEGER)");
+        sql("CREATE TABLE checkNulls2 (id INT PRIMARY KEY, c1 INTEGER, c2 INTEGER)");
 
         sql("create index t1_idx on t1 (c3, c2, c1)");
         sql("create index t2_idx on t2 (c3, c2, c1)");
@@ -59,6 +62,208 @@
                 new Object[] {4, 3, 3, 3},
                 new Object[] {5, 4, 4, 4}
         );
+
+        insertData("t3", List.of("ID", "C1"),
+                new Object[] {0, 1},
+                new Object[] {1, 2},
+                new Object[] {2, 3},
+                new Object[] {3, null},
+                new Object[] {7, 7},
+                new Object[] {8, 8}
+        );
+
+        insertData("checkNulls1", List.of("ID", "C1", "C2"),
+                new Object[] {0, null, 1}
+        );
+
+        insertData("checkNulls2", List.of("ID", "C1", "C2"),
+                new Object[] {0, 1, null}
+        );
+    }
+
+    @ParameterizedTest
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
+    @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+    public void testCheckNullsNullFirstLeftJoin(JoinType joinType) {
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls1 t1 left join checkNulls1 t2 on t1.c1 = t2.c1;",
+                joinType
+        )
+                .returns(null, 1, null, null)
+                .check();
+
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls1 t1 left join checkNulls1 t2 on t1.c1 = t2.c1 and t1.c2 = t2.c2;",
+                joinType
+        )
+                .returns(null, 1, null, null)
+                .check();
+
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls1 t1 left join checkNulls1 t2 on t1.c2 = t2.c2;",
+                joinType
+        )
+                .returns(null, 1, null, 1)
+                .check();
+    }
+
+    @ParameterizedTest
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
+    @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+    public void testCheckNullsNullFirstRightJoin(JoinType joinType) {
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls1 t1 right join checkNulls1 t2 on t1.c1 = t2.c1;",
+                joinType
+        )
+                .returns(null, null, null, 1)
+                .check();
+
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls1 t1 right join checkNulls1 t2 on t1.c1 = t2.c1 and t1.c2 = t2.c2;",
+                joinType
+        )
+                .returns(null, null, null, 1)
+                .check();
+
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls1 t1 right join checkNulls1 t2 on t1.c2 = t2.c2;",
+                joinType
+        )
+                .returns(null, 1, null, 1)
+                .check();
+    }
+
+    @ParameterizedTest
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
+    @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+    public void testCheckNullsNullFirstInnerJoin(JoinType joinType) {
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls1 t1 inner join checkNulls1 t2 on t1.c1 = t2.c1;",
+                joinType
+        )
+                .returnNothing()
+                .check();
+
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls1 t1 inner join checkNulls1 t2 on t1.c1 = t2.c1 and t1.c2 = t2.c2;",
+                joinType
+        )
+                .returnNothing()
+                .check();
+
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls1 t1 inner join checkNulls1 t2 on t1.c2 = t2.c2;",
+                joinType
+        )
+                .returns(null, 1, null, 1)
+                .check();
+    }
+
+    @ParameterizedTest
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
+    @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+    public void testCheckNullsNullSecondLeftJoin(JoinType joinType) {
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls2 t1 left join checkNulls2 t2 on t1.c1 = t2.c1;",
+                joinType
+        )
+                .returns(1, null, 1, null)
+                .check();
+
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls2 t1 left join checkNulls2 t2 on t1.c1 = t2.c1 and t1.c2 = t2.c2;",
+                joinType
+        )
+                .returns(1, null, null, null)
+                .check();
+
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls2 t1 left join checkNulls2 t2 on t1.c2 = t2.c2;",
+                joinType
+        )
+                .returns(1, null, null, null)
+                .check();
+    }
+
+    @ParameterizedTest
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
+    @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+    public void testCheckNullsNullSecondRightJoin(JoinType joinType) {
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls2 t1 right join checkNulls2 t2 on t1.c1 = t2.c1;",
+                joinType
+        )
+                .returns(1, null, 1, null)
+                .check();
+
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls2 t1 right join checkNulls2 t2 on t1.c1 = t2.c1 and t1.c2 = t2.c2;",
+                joinType
+        )
+                .returns(null, null, 1, null)
+                .check();
+
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls2 t1 right join checkNulls2 t2 on t1.c2 = t2.c2;",
+                joinType
+        )
+                .returns(null, null, 1, null)
+                .check();
+    }
+
+    @ParameterizedTest
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
+    @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+    public void testCheckNullsNullSecondInnerJoin(JoinType joinType) {
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls2 t1 inner join checkNulls2 t2 on t1.c1 = t2.c1;",
+                joinType
+        )
+                .returns(1, null, 1, null)
+                .check();
+
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls2 t1 inner join checkNulls2 t2 on t1.c1 = t2.c1 and t1.c2 = t2.c2;",
+                joinType
+        )
+                .returnNothing()
+                .check();
+
+        assertQuery("select t1.c1, t1.c2, t2.c1, t2.c2 "
+                        + "from checkNulls2 t1 inner join checkNulls2 t2 on t1.c2 = t2.c2;",
+                joinType
+        )
+                .returnNothing()
+                .check();
+    }
+
+    @ParameterizedTest
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
+    @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+    public void testFullOuterJoin(JoinType joinType) {
+        assertQuery(""
+                        + "select t1.c1 c11, t1.c2 c12, t1.c3 c13, t3.id c21, t3.c1 c22 "
+                        + "  from t1 "
+                        + "  full outer join t3 "
+                        + "    on t1.c1 = t3.id "
+                        + "   and t1.c2 = t3.c1 "
+                        + " order by t1.c1, t1.c2, t1.c3, t3.id",
+                joinType
+        )
+                .ordered()
+                .returns(1, 1, 1, null, null)
+                .returns(2, 2, 2, null, null)
+                .returns(2, null, 2, null, null)
+                .returns(3, 3, 3, null, null)
+                .returns(3, 3, null, null, null)
+                .returns(4, 4, 4, null, null)
+                .returns(null, null, null, 0, 1)
+                .returns(null, null, null, 1, 2)
+                .returns(null, null, null, 2, 3)
+                .returns(null, null, null, 3, null)
+                .returns(null, null, null, 7, 7)
+                .returns(null, null, null, 8, 8)
+                .check();
     }
 
     /**
@@ -68,6 +273,15 @@
     // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
     @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
     public void testInnerJoin(JoinType joinType) {
+        assertQuery("SELECT c1 FROM t3 WHERE c1 = ANY(SELECT c1 FROM t3) ORDER BY c1", joinType)
+                .ordered()
+                .returns(1)
+                .returns(2)
+                .returns(3)
+                .returns(7)
+                .returns(8)
+                .check();
+
         assertQuery(""
                 + "select t1.c1 c11, t1.c2 c12, t1.c3 c13, t2.c1 c21, t2.c2 c22 "
                 + "  from t1 "
@@ -250,13 +464,23 @@
      */
     @ParameterizedTest
     // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove exclude
-    @EnumSource(mode = Mode.EXCLUDE, names = "CORRELATED")
+    @EnumSource(mode = Mode.EXCLUDE, names = {"CORRELATED"})
     public void testLeftJoin(JoinType joinType) {
+        assertQuery("select t31.c1 from t3 t31 left join t3 t32 on t31.c1 = t32.c1 ORDER BY t31.c1;", joinType)
+                .ordered()
+                .returns(1)
+                .returns(2)
+                .returns(3)
+                .returns(7)
+                .returns(8)
+                .returns(null)
+                .check();
+
         assertQuery(""
                 + "select t1.c1 c11, t1.c2 c12, t1.c3 c13, t2.c1 c21, t2.c2 c22 "
                 + "  from t1 "
                 + "  left join t2 "
-                + "    on t1.c1 = t2.c1 "
+                + "    on t1.c1 = t2.c1"
                 + "   and t1.c2 = t2.c2 "
                 + " order by t1.c1, t1.c2, t1.c3",
                 joinType
@@ -843,6 +1067,8 @@
         Stream<Arguments> types = Arrays.stream(JoinType.values())
                 // TODO: https://issues.apache.org/jira/browse/IGNITE-21286 remove filter below
                 .filter(type -> type != JoinType.CORRELATED)
+                // TODO: https://issues.apache.org/jira/browse/IGNITE-22074 hash join to make a deal with "is not distinct" expression
+                .filter(type -> type != JoinType.HASHJOIN)
                 .flatMap(v -> Stream.of(Arguments.of(v, false), Arguments.of(v, true)));
 
         return types;
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
index 41ce7ce..cbbe218 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
@@ -126,7 +126,7 @@
     @Test
     @Disabled("https://issues.apache.org/jira/browse/IGNITE-21286")
     public void testIndexLoopJoin() {
-        assertQuery("SELECT /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter') */ d1.name, d2.name "
+        assertQuery("SELECT /*+ DISABLE_RULE('HashJoinConverter', 'MergeJoinConverter', 'NestedLoopJoinConverter') */ d1.name, d2.name "
                 + "FROM Developer d1, Developer d2 WHERE d1.id = d2.id")
                 .matches(containsSubPlan("CorrelatedNestedLoopJoin"))
                 .returns("Bach", "Bach")
@@ -916,7 +916,7 @@
             String sql = "SELECT t1.i1, t2.i1 FROM t t1 LEFT JOIN t t2 ON t1.i2 = t2.i1";
 
             assertQuery(sql)
-                    .disableRules("NestedLoopJoinConverter", "MergeJoinConverter")
+                    .disableRules("NestedLoopJoinConverter", "MergeJoinConverter", "HashJoinConverter")
                     .matches(containsSubPlan("CorrelatedNestedLoopJoin"))
                     .matches(containsIndexScan("PUBLIC", "T", "T_IDX"))
                     .returns(0, null)
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 5ebeedd..4a09b90 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -57,6 +57,7 @@
 import org.apache.ignite.internal.sql.engine.exec.rel.DataSourceScanNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.FilterNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.HashAggregateNode;
+import org.apache.ignite.internal.sql.engine.exec.rel.HashJoinNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
 import org.apache.ignite.internal.sql.engine.exec.rel.IndexScanNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.IndexSpoolNode;
@@ -81,6 +82,7 @@
 import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
 import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
+import org.apache.ignite.internal.sql.engine.rel.IgniteHashJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
 import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
@@ -263,6 +265,24 @@
 
     /** {@inheritDoc} */
     @Override
+    public Node<RowT> visit(IgniteHashJoin rel) {
+        RelDataType outType = rel.getRowType();
+        RelDataType leftType = rel.getLeft().getRowType();
+        RelDataType rightType = rel.getRight().getRowType();
+        JoinRelType joinType = rel.getJoinType();
+
+        Node<RowT> node = HashJoinNode.create(ctx, outType, leftType, rightType, joinType, rel.analyzeCondition());
+
+        Node<RowT> leftInput = visit(rel.getLeft());
+        Node<RowT> rightInput = visit(rel.getRight());
+
+        node.register(asList(leftInput, rightInput));
+
+        return node;
+    }
+
+    /** {@inheritDoc} */
+    @Override
     public Node<RowT> visit(IgniteCorrelatedNestedLoopJoin rel) {
         RelDataType leftType = rel.getLeft().getRowType();
         RelDataType rightType = rel.getRight().getRowType();
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
index 520fc69..d466d98 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
@@ -36,6 +36,7 @@
 import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
 import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
+import org.apache.ignite.internal.sql.engine.rel.IgniteHashJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
 import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
@@ -154,6 +155,11 @@
         }
 
         @Override
+        public Mapping visit(IgniteHashJoin rel) {
+            return mapBiRel(rel);
+        }
+
+        @Override
         public Mapping visit(IgniteCorrelatedNestedLoopJoin rel) {
             return mapBiRel(rel);
         }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java
new file mode 100644
index 0000000..194391e
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.jetbrains.annotations.Nullable;
+
+/** Right part materialized join node, i.e. all data from right part of join is available locally. */
+public abstract class AbstractRightMaterializedJoinNode<RowT> extends AbstractNode<RowT> {
+    /** Special value to highlights that all row were received and we are not waiting any more. */
+    static final int NOT_WAITING = -1;
+
+    protected boolean inLoop;
+    protected int requested;
+    int waitingLeft;
+    int waitingRight;
+    final Deque<RowT> leftInBuf = new ArrayDeque<>(inBufSize);
+    protected @Nullable RowT left;
+
+    AbstractRightMaterializedJoinNode(ExecutionContext<RowT> ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void request(int rowsCnt) throws Exception {
+        assert !nullOrEmpty(sources()) && sources().size() == 2;
+        assert rowsCnt > 0 && requested == 0;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        if (!inLoop) {
+            context().execute(this::doJoin, this::onError);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void rewindInternal() {
+        requested = 0;
+        waitingLeft = 0;
+        waitingRight = 0;
+        left = null;
+
+        leftInBuf.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected Downstream<RowT> requestDownstream(int idx) {
+        if (idx == 0) {
+            return new Downstream<>() {
+                /** {@inheritDoc} */
+                @Override
+                public void push(RowT row) throws Exception {
+                    pushLeft(row);
+                }
+
+                /** {@inheritDoc} */
+                @Override
+                public void end() throws Exception {
+                    endLeft();
+                }
+
+                /** {@inheritDoc} */
+                @Override
+                public void onError(Throwable e) {
+                    AbstractRightMaterializedJoinNode.this.onError(e);
+                }
+            };
+        } else if (idx == 1) {
+            return new Downstream<>() {
+                /** {@inheritDoc} */
+                @Override
+                public void push(RowT row) throws Exception {
+                    pushRight(row);
+                }
+
+                /** {@inheritDoc} */
+                @Override
+                public void end() throws Exception {
+                    endRight();
+                }
+
+                /** {@inheritDoc} */
+                @Override
+                public void onError(Throwable e) {
+                    AbstractRightMaterializedJoinNode.this.onError(e);
+                }
+            };
+        }
+
+        throw new IndexOutOfBoundsException();
+    }
+
+    private void pushLeft(RowT row) throws Exception {
+        assert downstream() != null;
+        assert waitingLeft > 0;
+
+        checkState();
+
+        waitingLeft--;
+
+        leftInBuf.add(row);
+
+        join();
+    }
+
+    private void endLeft() throws Exception {
+        assert downstream() != null;
+        assert waitingLeft > 0;
+
+        checkState();
+
+        waitingLeft = NOT_WAITING;
+
+        join();
+    }
+
+    private void endRight() throws Exception {
+        assert downstream() != null;
+        assert waitingRight > 0;
+
+        checkState();
+
+        waitingRight = NOT_WAITING;
+
+        join();
+    }
+
+    Node<RowT> leftSource() {
+        return sources().get(0);
+    }
+
+    Node<RowT> rightSource() {
+        return sources().get(1);
+    }
+
+    private void doJoin() throws Exception {
+        checkState();
+
+        join();
+    }
+
+    protected abstract void join() throws Exception;
+
+    protected abstract void pushRight(RowT row) throws Exception;
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinNode.java
new file mode 100644
index 0000000..c07258a
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinNode.java
@@ -0,0 +1,707 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.apache.ignite.internal.sql.engine.util.TypeUtils.rowSchemaFromRelTypes;
+
+import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowBuilder;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+
+/** HashJoin implementor. */
+public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNode<RowT> {
+    final Map<RowWrapper<RowT>, TouchedCollection<RowT>> hashStore = new Object2ObjectOpenHashMap<>();
+    protected final RowHandler<RowT> handler;
+
+    private final List<Integer> leftJoinPositions;
+    private final List<Integer> rightJoinPositions;
+
+    final boolean touchResults;
+
+    Iterator<RowT> rightIt = Collections.emptyIterator();
+    private final RowSchema rightJoinRelatedRowSchema;
+    private final RowSchema leftJoinRelatedRowSchema;
+
+    private final RowBuilder<RowT> leftRowBuilder;
+    private final RowBuilder<RowT> rightRowBuilder;
+
+    private HashJoinNode(ExecutionContext<RowT> ctx, JoinInfo joinInfo, boolean touch,
+            RelDataType leftRowType, RelDataType rightRowType) {
+        super(ctx);
+
+        handler = ctx.rowHandler();
+        touchResults = touch;
+
+        leftJoinPositions = joinInfo.leftKeys.toIntegerList();
+        rightJoinPositions = joinInfo.rightKeys.toIntegerList();
+
+        assert leftJoinPositions.size() == rightJoinPositions.size();
+
+        ImmutableIntList rightKeys = joinInfo.rightKeys;
+        List<RelDataType> rightTypes = new ArrayList<>(rightKeys.size());
+        List<RelDataTypeField> rightFields = rightRowType.getFieldList();
+        for (int rightPos : rightKeys) {
+            rightTypes.add(rightFields.get(rightPos).getType());
+        }
+        rightJoinRelatedRowSchema = rowSchemaFromRelTypes(rightTypes);
+
+        ImmutableIntList leftKeys = joinInfo.leftKeys;
+        List<RelDataType> leftTypes = new ArrayList<>(leftKeys.size());
+        List<RelDataTypeField> leftFields = leftRowType.getFieldList();
+        for (int leftPos : leftKeys) {
+            leftTypes.add(leftFields.get(leftPos).getType());
+        }
+        leftJoinRelatedRowSchema = rowSchemaFromRelTypes(leftTypes);
+
+        RowFactory<RowT> leftRowFactory = handler.factory(leftJoinRelatedRowSchema);
+        leftRowBuilder = leftRowFactory.rowBuilder();
+
+        RowFactory<RowT> rightRowFactory = handler.factory(rightJoinRelatedRowSchema);
+        rightRowBuilder = rightRowFactory.rowBuilder();
+    }
+
+    @Override
+    protected void rewindInternal() {
+        rightIt = Collections.emptyIterator();
+
+        hashStore.clear();
+
+        super.rewindInternal();
+    }
+
+    /** Supplied algorithm implementation. */
+    public static <RowT> HashJoinNode<RowT> create(ExecutionContext<RowT> ctx, RelDataType outputRowType,
+            RelDataType leftRowType, RelDataType rightRowType, JoinRelType joinType, JoinInfo joinInfo) {
+
+        switch (joinType) {
+            case INNER:
+                return new InnerHashJoin<>(ctx, joinInfo, leftRowType, rightRowType);
+
+            case LEFT: {
+                RowSchema rightRowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rightRowType));
+                RowHandler.RowFactory<RowT> rightRowFactory = ctx.rowHandler().factory(rightRowSchema);
+
+                return new LeftHashJoin<>(ctx, rightRowFactory, joinInfo, leftRowType, rightRowType);
+            }
+
+            case RIGHT: {
+                RowSchema leftRowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(leftRowType));
+                RowHandler.RowFactory<RowT> leftRowFactory = ctx.rowHandler().factory(leftRowSchema);
+
+                return new RightHashJoin<>(ctx, leftRowFactory, joinInfo, leftRowType, rightRowType);
+            }
+
+            case FULL: {
+                RowSchema leftRowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(leftRowType));
+                RowSchema rightRowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rightRowType));
+                RowHandler.RowFactory<RowT> leftRowFactory = ctx.rowHandler().factory(leftRowSchema);
+                RowHandler.RowFactory<RowT> rightRowFactory = ctx.rowHandler().factory(rightRowSchema);
+
+                return new FullOuterHashJoin<>(ctx, leftRowFactory, rightRowFactory, joinInfo, leftRowType, rightRowType);
+            }
+
+            case SEMI:
+                return new SemiHashJoin<>(ctx, joinInfo, leftRowType, rightRowType);
+
+            case ANTI:
+                return new AntiHashJoin<>(ctx, joinInfo, leftRowType, rightRowType);
+
+            default:
+                throw new IllegalStateException("Join type \"" + joinType + "\" is not supported yet");
+        }
+    }
+
+    private static class InnerHashJoin<RowT> extends HashJoinNode<RowT> {
+        private InnerHashJoin(
+                ExecutionContext<RowT> ctx,
+                JoinInfo joinInfo,
+                RelDataType leftRowType,
+                RelDataType rightRowType
+        ) {
+            super(ctx, joinInfo, false, leftRowType, rightRowType);
+        }
+
+        @Override
+        protected void join() throws Exception {
+            if (waitingRight == NOT_WAITING) {
+                inLoop = true;
+                try {
+                    while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
+                        if (!rightIt.hasNext()) {
+                            left = leftInBuf.remove();
+
+                            Collection<RowT> rightRows = lookup(left, touchResults);
+
+                            rightIt = rightRows.iterator();
+                        }
+
+                        if (rightIt.hasNext()) {
+                            while (rightIt.hasNext()) {
+                                checkState();
+
+                                RowT right = rightIt.next();
+
+                                --requested;
+
+                                RowT row = handler.concat(left, right);
+                                downstream().push(row);
+
+                                if (requested == 0) {
+                                    break;
+                                }
+                            }
+                        }
+
+                        if (!rightIt.hasNext()) {
+                            left = null;
+                        }
+                    }
+                } finally {
+                    inLoop = false;
+                }
+            }
+
+            getMoreOrEnd();
+        }
+    }
+
+    private static class LeftHashJoin<RowT> extends HashJoinNode<RowT> {
+        /** Right row factory. */
+        private final RowHandler.RowFactory<RowT> rightRowFactory;
+
+        private LeftHashJoin(
+                ExecutionContext<RowT> ctx,
+                RowHandler.RowFactory<RowT> rightRowFactory,
+                JoinInfo joinInfo,
+                RelDataType leftRowType,
+                RelDataType rightRowType
+        ) {
+            super(ctx, joinInfo, false, leftRowType, rightRowType);
+
+            this.rightRowFactory = rightRowFactory;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        protected void join() throws Exception {
+            if (waitingRight == NOT_WAITING) {
+                inLoop = true;
+                try {
+                    while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
+                        checkState();
+
+                        if (!rightIt.hasNext()) {
+                            left = leftInBuf.remove();
+
+                            Collection<RowT> rightRows = lookup(left, touchResults);
+
+                            if (rightRows.isEmpty()) {
+                                requested--;
+                                downstream().push(handler.concat(left, rightRowFactory.create()));
+                            }
+
+                            rightIt = rightRows.iterator();
+                        }
+
+                        if (rightIt.hasNext()) {
+                            while (rightIt.hasNext()) {
+                                checkState();
+
+                                RowT right = rightIt.next();
+
+                                --requested;
+
+                                RowT row = handler.concat(left, right);
+                                downstream().push(row);
+
+                                if (requested == 0) {
+                                    break;
+                                }
+                            }
+                        }
+
+                        if (!rightIt.hasNext()) {
+                            left = null;
+                        }
+                    }
+                } finally {
+                    inLoop = false;
+                }
+            }
+
+            getMoreOrEnd();
+        }
+    }
+
+    private static class RightHashJoin<RowT> extends HashJoinNode<RowT> {
+        /** Left row factory. */
+        private final RowHandler.RowFactory<RowT> leftRowFactory;
+
+        private RightHashJoin(
+                ExecutionContext<RowT> ctx,
+                RowHandler.RowFactory<RowT> leftRowFactory,
+                JoinInfo joinInfo,
+                RelDataType leftRowType,
+                RelDataType rightRowType
+        ) {
+            super(ctx, joinInfo, true, leftRowType, rightRowType);
+
+            this.leftRowFactory = leftRowFactory;
+        }
+
+        @Override
+        protected void join() throws Exception {
+            if (waitingRight == NOT_WAITING) {
+                inLoop = true;
+                try {
+                    while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
+                        checkState();
+
+                        if (!rightIt.hasNext()) {
+                            left = leftInBuf.remove();
+
+                            Collection<RowT> rightRows = lookup(left, touchResults);
+
+                            rightIt = rightRows.iterator();
+                        }
+
+                        if (rightIt.hasNext()) {
+                            while (rightIt.hasNext()) {
+                                checkState();
+
+                                RowT right = rightIt.next();
+
+                                --requested;
+
+                                RowT row = handler.concat(left, right);
+                                downstream().push(row);
+
+                                if (requested == 0) {
+                                    break;
+                                }
+                            }
+                        }
+
+                        if (!rightIt.hasNext()) {
+                            left = null;
+                        }
+                    }
+                } finally {
+                    inLoop = false;
+                }
+            }
+
+            if (left == null && leftInBuf.isEmpty() && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && requested > 0) {
+                inLoop = true;
+                try {
+                    if (!rightIt.hasNext()) {
+                        rightIt = getUntouched(hashStore);
+                    }
+
+                    RowT emptyLeft = leftRowFactory.create();
+
+                    while (rightIt.hasNext()) {
+                        checkState();
+                        RowT right = rightIt.next();
+                        RowT row = handler.concat(emptyLeft, right);
+                        --requested;
+
+                        downstream().push(row);
+
+                        if (requested == 0) {
+                            break;
+                        }
+                    }
+                } finally {
+                    inLoop = false;
+                }
+            }
+
+            getMoreOrEnd();
+        }
+    }
+
+    private static class FullOuterHashJoin<RowT> extends HashJoinNode<RowT> {
+        /** Left row factory. */
+        private final RowHandler.RowFactory<RowT> leftRowFactory;
+
+        /** Right row factory. */
+        private final RowHandler.RowFactory<RowT> rightRowFactory;
+
+        private FullOuterHashJoin(
+                ExecutionContext<RowT> ctx,
+                RowHandler.RowFactory<RowT> leftRowFactory,
+                RowHandler.RowFactory<RowT> rightRowFactory,
+                JoinInfo joinInfo,
+                RelDataType leftRowType,
+                RelDataType rightRowType
+        ) {
+            super(ctx, joinInfo, true, leftRowType, rightRowType);
+
+            this.leftRowFactory = leftRowFactory;
+            this.rightRowFactory = rightRowFactory;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        protected void join() throws Exception {
+            if (waitingRight == NOT_WAITING) {
+                inLoop = true;
+                try {
+                    while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
+                        checkState();
+
+                        if (!rightIt.hasNext()) {
+                            left = leftInBuf.remove();
+
+                            Collection<RowT> rightRows = lookup(left, touchResults);
+
+                            if (rightRows.isEmpty()) {
+                                requested--;
+                                downstream().push(handler.concat(left, rightRowFactory.create()));
+                            }
+
+                            rightIt = rightRows.iterator();
+                        }
+
+                        if (rightIt.hasNext()) {
+                            while (rightIt.hasNext()) {
+                                checkState();
+
+                                RowT right = rightIt.next();
+
+                                --requested;
+
+                                RowT row = handler.concat(left, right);
+                                downstream().push(row);
+
+                                if (requested == 0) {
+                                    break;
+                                }
+                            }
+                        }
+
+                        if (!rightIt.hasNext()) {
+                            left = null;
+                        }
+                    }
+                } finally {
+                    inLoop = false;
+                }
+            }
+
+            if (left == null && !rightIt.hasNext() && leftInBuf.isEmpty() && waitingLeft == NOT_WAITING
+                    && waitingRight == NOT_WAITING && requested > 0) {
+                inLoop = true;
+                try {
+                    if (!rightIt.hasNext()) {
+                        rightIt = getUntouched(hashStore);
+                    }
+
+                    RowT emptyLeft = leftRowFactory.create();
+
+                    while (rightIt.hasNext()) {
+                        checkState();
+                        RowT right = rightIt.next();
+                        RowT row = handler.concat(emptyLeft, right);
+                        --requested;
+
+                        downstream().push(row);
+
+                        if (requested == 0) {
+                            break;
+                        }
+                    }
+                } finally {
+                    inLoop = false;
+                }
+            }
+
+            getMoreOrEnd();
+        }
+    }
+
+    private static class SemiHashJoin<RowT> extends HashJoinNode<RowT> {
+        private SemiHashJoin(
+                ExecutionContext<RowT> ctx,
+                JoinInfo joinInfo,
+                RelDataType leftRowType,
+                RelDataType rightRowType
+        ) {
+            super(ctx, joinInfo, false, leftRowType, rightRowType);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        protected void join() throws Exception {
+            if (waitingRight == NOT_WAITING) {
+                inLoop = true;
+                try {
+                    while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
+                        checkState();
+
+                        left = leftInBuf.remove();
+
+                        Collection<RowT> rightRows = lookup(left, touchResults);
+
+                        if (!rightRows.isEmpty()) {
+                            requested--;
+
+                            downstream().push(left);
+
+                            if (requested == 0) {
+                                break;
+                            }
+                        }
+
+                        left = null;
+                    }
+                } finally {
+                    inLoop = false;
+                }
+            }
+
+            getMoreOrEnd();
+        }
+    }
+
+    private static class AntiHashJoin<RowT> extends HashJoinNode<RowT> {
+        private AntiHashJoin(
+                ExecutionContext<RowT> ctx,
+                JoinInfo joinInfo,
+                RelDataType leftRowType,
+                RelDataType rightRowType
+        ) {
+            super(ctx, joinInfo, false, leftRowType, rightRowType);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        protected void join() throws Exception {
+            if (waitingRight == NOT_WAITING) {
+                inLoop = true;
+                try {
+                    while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
+                        checkState();
+
+                        left = leftInBuf.remove();
+
+                        Collection<RowT> rightRows = lookup(left, touchResults);
+
+                        if (rightRows.isEmpty()) {
+                            requested--;
+
+                            downstream().push(left);
+
+                            if (requested == 0) {
+                                break;
+                            }
+                        }
+
+                        left = null;
+                    }
+                } finally {
+                    inLoop = false;
+                }
+            }
+
+            getMoreOrEnd();
+        }
+    }
+
+    Collection<RowT> lookup(RowT row, boolean processTouched) {
+        Collection<RowT> coll = Collections.emptyList();
+
+        for (Integer entry : leftJoinPositions) {
+            Object ent = handler.get(entry, row);
+
+            if (ent == null) {
+                leftRowBuilder.reset();
+                return Collections.emptyList();
+            }
+
+            leftRowBuilder.addField(ent);
+        }
+
+        RowWrapper<RowT> row0 = new RowWrapper<>(leftRowBuilder.buildAndReset(), handler, leftJoinPositions.size());
+
+        TouchedCollection<RowT> found = hashStore.get(row0);
+
+        if (found != null) {
+            coll = found.items();
+
+            if (processTouched) {
+                found.touched = true;
+            }
+        }
+
+        return coll;
+    }
+
+    private static <RowT> Iterator<RowT> getUntouched(Map<RowWrapper<RowT>, TouchedCollection<RowT>> entries) {
+        return new Iterator<RowT>() {
+            private final Iterator<TouchedCollection<RowT>> it = entries.values().iterator();
+            private Iterator<RowT> innerIt = Collections.emptyIterator();
+
+            @Override
+            public boolean hasNext() {
+                if (innerIt.hasNext()) {
+                    return true;
+                }
+
+                advance();
+
+                return innerIt.hasNext();
+            }
+
+            @Override
+            public RowT next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+                return innerIt.next();
+            }
+
+            void advance() {
+                while (it.hasNext()) {
+                    TouchedCollection<RowT> coll = it.next();
+                    if (!coll.touched && !coll.items().isEmpty()) {
+                        innerIt = coll.items().iterator();
+                        break;
+                    }
+                }
+            }
+        };
+    }
+
+    @Override
+    protected void pushRight(RowT row) throws Exception {
+        assert downstream() != null;
+        assert waitingRight > 0;
+
+        checkState();
+
+        waitingRight--;
+
+        for (Integer entry : rightJoinPositions) {
+            Object ent = handler.get(entry, row);
+            rightRowBuilder.addField(ent);
+        }
+
+        RowWrapper<RowT> row0 = new RowWrapper<>(rightRowBuilder.buildAndReset(), handler, rightJoinPositions.size());
+        TouchedCollection<RowT> raw = hashStore.computeIfAbsent(row0, k -> new TouchedCollection<>());
+        raw.add(row);
+
+        if (waitingRight == 0) {
+            rightSource().request(waitingRight = inBufSize);
+        }
+    }
+
+    private static class RowWrapper<RowT> {
+        RowT row;
+        RowHandler<RowT> handler;
+        int itemsCount;
+
+        RowWrapper(RowT row, RowHandler<RowT> handler, int itemsCount) {
+            this.row = row;
+            this.handler = handler;
+            this.itemsCount = itemsCount;
+        }
+
+        @Override
+        public int hashCode() {
+            int hashCode = 0;
+            for (int i = 0; i < itemsCount; ++i) {
+                Object entHold = handler.get(i, row);
+                hashCode += Objects.hashCode(entHold);
+            }
+            return hashCode;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+
+            RowWrapper<RowT> row0 = (RowWrapper<RowT>) obj;
+            for (int i = 0; i < itemsCount; ++i) {
+                Object input = handler.get(i, row0.row);
+                Object current = handler.get(i, row);
+                boolean comp = Objects.equals(input, current);
+                if (!comp) {
+                    return comp;
+                }
+            }
+            return true;
+        }
+    }
+
+    void getMoreOrEnd() throws Exception {
+        if (waitingRight == 0) {
+            rightSource().request(waitingRight = inBufSize);
+        }
+
+        if (waitingLeft == 0 && leftInBuf.isEmpty()) {
+            leftSource().request(waitingLeft = inBufSize);
+        }
+
+        if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && leftInBuf.isEmpty() && left == null
+                && !rightIt.hasNext()) {
+            requested = 0;
+            downstream().end();
+        }
+    }
+
+    private static class TouchedCollection<RowT> {
+        Collection<RowT> coll;
+        boolean touched;
+
+        TouchedCollection() {
+            this.coll = new ArrayList<>();
+        }
+
+        void add(RowT row) {
+            coll.add(row);
+        }
+
+        Collection<RowT> items() {
+            return coll;
+        }
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinNode.java
index bcbebb0..a2807b1 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinNode.java
@@ -18,12 +18,9 @@
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
 import static org.apache.ignite.internal.sql.engine.util.TypeUtils.rowSchemaFromRelTypes;
-import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.Deque;
 import java.util.List;
 import java.util.function.BiPredicate;
 import org.apache.calcite.plan.RelOptUtil;
@@ -38,25 +35,12 @@
  * NestedLoopJoinNode.
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
-public abstract class NestedLoopJoinNode<RowT> extends AbstractNode<RowT> {
-    /** Special value to highlights that all row were received and we are not waiting any more. */
-    protected static final int NOT_WAITING = -1;
-
+public abstract class NestedLoopJoinNode<RowT> extends AbstractRightMaterializedJoinNode<RowT> {
     protected final BiPredicate<RowT, RowT> cond;
 
     protected final RowHandler<RowT> handler;
 
-    protected int requested;
-
-    protected int waitingLeft;
-
-    protected int waitingRight;
-
-    protected final List<RowT> rightMaterialized = new ArrayList<>(inBufSize);
-
-    protected final Deque<RowT> leftInBuf = new ArrayDeque<>(inBufSize);
-
-    protected boolean inLoop;
+    final List<RowT> rightMaterialized = new ArrayList<>(inBufSize);
 
     /**
      * Constructor.
@@ -65,7 +49,7 @@
      * @param ctx  Execution context.
      * @param cond Join expression.
      */
-    private NestedLoopJoinNode(ExecutionContext<RowT> ctx, BiPredicate<RowT, RowT> cond) {
+    NestedLoopJoinNode(ExecutionContext<RowT> ctx, BiPredicate<RowT, RowT> cond) {
         super(ctx);
 
         this.cond = cond;
@@ -74,98 +58,14 @@
 
     /** {@inheritDoc} */
     @Override
-    public void request(int rowsCnt) throws Exception {
-        assert !nullOrEmpty(sources()) && sources().size() == 2;
-        assert rowsCnt > 0 && requested == 0;
-
-        checkState();
-
-        requested = rowsCnt;
-
-        if (!inLoop) {
-            context().execute(this::doJoin, this::onError);
-        }
-    }
-
-    private void doJoin() throws Exception {
-        checkState();
-
-        join();
-    }
-
-    /** {@inheritDoc} */
-    @Override
     protected void rewindInternal() {
-        requested = 0;
-        waitingLeft = 0;
-        waitingRight = 0;
-
         rightMaterialized.clear();
-        leftInBuf.clear();
+
+        super.rewindInternal();
     }
 
-    /** {@inheritDoc} */
     @Override
-    protected Downstream<RowT> requestDownstream(int idx) {
-        if (idx == 0) {
-            return new Downstream<RowT>() {
-                /** {@inheritDoc} */
-                @Override
-                public void push(RowT row) throws Exception {
-                    pushLeft(row);
-                }
-
-                /** {@inheritDoc} */
-                @Override
-                public void end() throws Exception {
-                    endLeft();
-                }
-
-                /** {@inheritDoc} */
-                @Override
-                public void onError(Throwable e) {
-                    NestedLoopJoinNode.this.onError(e);
-                }
-            };
-        } else if (idx == 1) {
-            return new Downstream<RowT>() {
-                /** {@inheritDoc} */
-                @Override
-                public void push(RowT row) throws Exception {
-                    pushRight(row);
-                }
-
-                /** {@inheritDoc} */
-                @Override
-                public void end() throws Exception {
-                    endRight();
-                }
-
-                /** {@inheritDoc} */
-                @Override
-                public void onError(Throwable e) {
-                    NestedLoopJoinNode.this.onError(e);
-                }
-            };
-        }
-
-        throw new IndexOutOfBoundsException();
-    }
-
-    private void pushLeft(RowT row) throws Exception {
-        assert downstream() != null;
-        assert waitingLeft > 0;
-
-        checkState();
-
-        waitingLeft--;
-
-        leftInBuf.add(row);
-
-        join();
-    }
-
-    private void pushRight(RowT row) throws Exception {
+    protected void pushRight(RowT row) throws Exception {
         assert downstream() != null;
         assert waitingRight > 0;
 
@@ -180,38 +80,6 @@
         }
     }
 
-    private void endLeft() throws Exception {
-        assert downstream() != null;
-        assert waitingLeft > 0;
-
-        checkState();
-
-        waitingLeft = NOT_WAITING;
-
-        join();
-    }
-
-    private void endRight() throws Exception {
-        assert downstream() != null;
-        assert waitingRight > 0;
-
-        checkState();
-
-        waitingRight = NOT_WAITING;
-
-        join();
-    }
-
-    protected Node<RowT> leftSource() {
-        return sources().get(0);
-    }
-
-    protected Node<RowT> rightSource() {
-        return sources().get(1);
-    }
-
-    protected abstract void join() throws Exception;
-
     /**
      * Create.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -257,8 +125,6 @@
     }
 
     private static class InnerJoin<RowT> extends NestedLoopJoinNode<RowT> {
-        private RowT left;
-
         private int rightIdx;
 
         /**
@@ -275,7 +141,6 @@
         /** {@inheritDoc} */
         @Override
         protected void rewindInternal() {
-            left = null;
             rightIdx = 0;
 
             super.rewindInternal();
@@ -335,8 +200,6 @@
         /** Whether current left row was matched or not. */
         private boolean matched;
 
-        private RowT left;
-
         private int rightIdx;
 
         /**
@@ -360,7 +223,6 @@
         @Override
         protected void rewindInternal() {
             matched = false;
-            left = null;
             rightIdx = 0;
 
             super.rewindInternal();
@@ -430,15 +292,13 @@
     }
 
     private static class RightJoin<RowT> extends NestedLoopJoinNode<RowT> {
-        /** Right row factory. */
+        /** Left row factory. */
         private final RowHandler.RowFactory<RowT> leftRowFactory;
 
         private @Nullable BitSet rightNotMatchedIndexes;
 
         private int lastPushedInd;
 
-        private RowT left;
-
         private int rightIdx;
 
         /**
@@ -462,7 +322,6 @@
         /** {@inheritDoc} */
         @Override
         protected void rewindInternal() {
-            left = null;
             rightNotMatchedIndexes = null;
             lastPushedInd = 0;
             rightIdx = 0;
@@ -573,8 +432,6 @@
 
         private int lastPushedInd;
 
-        private RowT left;
-
         private int rightIdx;
 
         /**
@@ -601,7 +458,6 @@
         /** {@inheritDoc} */
         @Override
         protected void rewindInternal() {
-            left = null;
             leftMatched = false;
             rightNotMatchedIndexes = null;
             lastPushedInd = 0;
@@ -714,8 +570,6 @@
     }
 
     private static class SemiJoin<RowT> extends NestedLoopJoinNode<RowT> {
-        private RowT left;
-
         private int rightIdx;
 
         /**
@@ -732,7 +586,6 @@
         /** {@inheritDoc} */
         @Override
         protected void rewindInternal() {
-            left = null;
             rightIdx = 0;
 
             super.rewindInternal();
@@ -786,8 +639,6 @@
     }
 
     private static class AntiJoin<RowT> extends NestedLoopJoinNode<RowT> {
-        private RowT left;
-
         private int rightIdx;
 
         /**
@@ -803,7 +654,6 @@
 
         @Override
         protected void rewindInternal() {
-            left = null;
             rightIdx = 0;
 
             super.rewindInternal();
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java
index 6a980e9..158c157 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java
@@ -22,6 +22,7 @@
 import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
 import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
+import org.apache.ignite.internal.sql.engine.rel.IgniteHashJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
 import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
@@ -89,6 +90,12 @@
 
     /** {@inheritDoc} */
     @Override
+    public IgniteRel visit(IgniteHashJoin rel) {
+        return processNode(rel);
+    }
+
+    /** {@inheritDoc} */
+    @Override
     public IgniteRel visit(IgniteCorrelatedNestedLoopJoin rel) {
         return processNode(rel);
     }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
index a3cd8b3..e362a3c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
@@ -50,6 +50,7 @@
 import org.apache.ignite.internal.sql.engine.rule.FilterSpoolMergeToHashIndexSpoolRule;
 import org.apache.ignite.internal.sql.engine.rule.FilterSpoolMergeToSortedIndexSpoolRule;
 import org.apache.ignite.internal.sql.engine.rule.HashAggregateConverterRule;
+import org.apache.ignite.internal.sql.engine.rule.HashJoinConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.LogicalScanConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.MergeJoinConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.NestedLoopJoinConverterRule;
@@ -227,6 +228,7 @@
 
             CorrelateToNestedLoopRule.INSTANCE,
             NestedLoopJoinConverterRule.INSTANCE,
+            HashJoinConverterRule.INSTANCE,
 
             ValuesConverterRule.INSTANCE,
             LogicalScanConverterRule.INDEX_SCAN,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteHashJoin.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteHashJoin.java
new file mode 100644
index 0000000..5ea0388
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteHashJoin.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.rel;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+
+/**
+ * Relational operator that represent hash join algo.
+ */
+public class IgniteHashJoin extends AbstractIgniteJoin {
+    private static final String REL_TYPE_NAME = "HashJoin";
+
+    public IgniteHashJoin(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right,
+            RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
+        super(cluster, traitSet, left, right, condition, variablesSet, joinType);
+    }
+
+    /** Constructor. */
+    public IgniteHashJoin(RelInput input) {
+        this(input.getCluster(),
+                input.getTraitSet().replace(IgniteConvention.INSTANCE),
+                input.getInputs().get(0),
+                input.getInputs().get(1),
+                input.getExpression("condition"),
+                Set.copyOf(Commons.transform(input.getIntegerList("variablesSet"), CorrelationId::new)),
+                input.getEnum("joinType", JoinRelType.class));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = (IgniteCostFactory) planner.getCostFactory();
+
+        double leftRowCount = mq.getRowCount(getLeft());
+        double rightRowCount = mq.getRowCount(getRight());
+
+        if (Double.isInfinite(leftRowCount) || Double.isInfinite(rightRowCount)) {
+            return planner.getCostFactory().makeInfiniteCost();
+        }
+
+        double rowCount = leftRowCount + rightRowCount;
+
+        int rightKeysSize = joinInfo.rightKeys.size();
+
+        double rightSize = rightRowCount * IgniteCost.AVERAGE_FIELD_SIZE * getRight().getRowType().getFieldCount();
+
+        double distRightRows = Util.first(mq.getDistinctRowCount(right, ImmutableBitSet.of(joinInfo.rightKeys), null), 0.9 * rightRowCount);
+
+        rightSize += distRightRows * rightKeysSize * IgniteCost.AVERAGE_FIELD_SIZE;
+
+        return costFactory.makeCost(rowCount, rowCount * IgniteCost.HASH_LOOKUP_COST, 0, rightSize, 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Join copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType,
+            boolean semiJoinDone) {
+        return new IgniteHashJoin(getCluster(), traitSet, left, right, condition, variablesSet, joinType);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteHashJoin(cluster, getTraitSet(), inputs.get(0), inputs.get(1), getCondition(),
+                getVariablesSet(), getJoinType());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String getRelTypeName() {
+        return REL_TYPE_NAME;
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java
index ddaf73d..5f4238c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java
@@ -57,6 +57,11 @@
     /**
      * See {@link IgniteRelVisitor#visit(IgniteRel)}.
      */
+    T visit(IgniteHashJoin rel);
+
+    /**
+     * See {@link IgniteRelVisitor#visit(IgniteRel)}.
+     */
     T visit(IgniteCorrelatedNestedLoopJoin rel);
 
     /**
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashJoinConverterRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashJoinConverterRule.java
new file mode 100644
index 0000000..c2f7ae7
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashJoinConverterRule.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.rule;
+
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
+import org.apache.ignite.internal.sql.engine.rel.IgniteHashJoin;
+
+/**
+ * Hash join converter.
+ */
+public class HashJoinConverterRule extends AbstractIgniteConverterRule<LogicalJoin> {
+    public static final RelOptRule INSTANCE = new HashJoinConverterRule();
+
+    /**
+     * Creates a converter.
+     */
+    public HashJoinConverterRule() {
+        super(LogicalJoin.class, "HashJoinConverter");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        LogicalJoin logicalJoin = call.rel(0);
+
+        return !nullOrEmpty(logicalJoin.analyzeCondition().pairs())
+                && logicalJoin.analyzeCondition().isEqui() && acceptableConditions(logicalJoin.getCondition());
+    }
+
+    private static boolean acceptableConditions(RexNode node) {
+        RexVisitor<Void> v = new RexVisitorImpl<>(true) {
+            @Override
+            public Void visitCall(RexCall call) {
+                SqlKind opKind = call.getOperator().getKind();
+                if (opKind != SqlKind.EQUALS && opKind != SqlKind.AND) {
+                    throw Util.FoundOne.NULL;
+                }
+                return super.visitCall(call);
+            }
+        };
+
+        try {
+            node.accept(v);
+
+            return true;
+        } catch (Util.FoundOne e) {
+            return false;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalJoin rel) {
+        RelOptCluster cluster = rel.getCluster();
+        RelTraitSet outTraits = cluster.traitSetOf(IgniteConvention.INSTANCE);
+        RelTraitSet leftInTraits = cluster.traitSetOf(IgniteConvention.INSTANCE);
+        RelTraitSet rightInTraits = cluster.traitSetOf(IgniteConvention.INSTANCE);
+        RelNode left = convert(rel.getLeft(), leftInTraits);
+        RelNode right = convert(rel.getRight(), rightInTraits);
+
+        return new IgniteHashJoin(cluster, outTraits, left, right, rel.getCondition(), rel.getVariablesSet(), rel.getJoinType());
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
index 569f286..3e32ab8 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
@@ -162,6 +162,20 @@
     }
 
     @Test
+    public void testHashJoin() {
+        addNodes("N0", "N1", "N2", "N3", "N4");
+
+        addTable("T1", "N1");
+        addTable("T2", "N1");
+        addTable("T2", "N2");
+
+        setRowCount("T1", 200);
+        setRowCount("T2", 100);
+
+        testRunner.runTest(this::initSchema, "hash_join.test");
+    }
+
+    @Test
     public void testTableIdentity() {
         addNodes("N0", "N1", "N2");
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractJoinExecutionTest.java
new file mode 100644
index 0000000..3be2f2f
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractJoinExecutionTest.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.apache.calcite.rel.core.JoinRelType.ANTI;
+import static org.apache.calcite.rel.core.JoinRelType.FULL;
+import static org.apache.calcite.rel.core.JoinRelType.INNER;
+import static org.apache.calcite.rel.core.JoinRelType.LEFT;
+import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
+import static org.apache.calcite.rel.core.JoinRelType.SEMI;
+import static org.apache.ignite.internal.sql.engine.util.Commons.getFieldFromBiRows;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.TestDownstream;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.junit.jupiter.api.Test;
+
+/** Common join execution test. */
+public abstract class AbstractJoinExecutionTest extends AbstractExecutionTest<Object[]> {
+    abstract JoinAlgo joinAlgo();
+
+    @Test
+    public void joinEmptyTables() {
+        verifyJoin(EMPTY, EMPTY, INNER, EMPTY, joinAlgo());
+        verifyJoin(EMPTY, EMPTY, LEFT, EMPTY, joinAlgo());
+        verifyJoin(EMPTY, EMPTY, RIGHT, EMPTY, joinAlgo());
+        verifyJoin(EMPTY, EMPTY, FULL, EMPTY, joinAlgo());
+        verifyJoin(EMPTY, EMPTY, SEMI, EMPTY, joinAlgo());
+        verifyJoin(EMPTY, EMPTY, ANTI, EMPTY, joinAlgo());
+    }
+
+    @Test
+    public void joinEmptyLeftTable() {
+        Object[][] right = {
+                {1, "Core"},
+                {1, "OLD_Core"},
+                {2, "SQL"}
+        };
+
+        verifyJoin(EMPTY, right, INNER, EMPTY, joinAlgo());
+        verifyJoin(EMPTY, right, LEFT, EMPTY, joinAlgo());
+        verifyJoin(EMPTY, right, RIGHT, new Object[][]{
+                {null, null, "Core"},
+                {null, null, "OLD_Core"},
+                {null, null, "SQL"}
+        }, joinAlgo());
+        verifyJoin(EMPTY, right, FULL, new Object[][]{
+                {null, null, "Core"},
+                {null, null, "OLD_Core"},
+                {null, null, "SQL"}
+        }, joinAlgo());
+        verifyJoin(EMPTY, right, SEMI, EMPTY, joinAlgo());
+        verifyJoin(EMPTY, right, ANTI, EMPTY, joinAlgo());
+    }
+
+    @Test
+    public void joinEmptyRightTable() {
+        Object[][] left = {
+                {1, "Roman", null},
+                {2, "Igor", 1},
+                {3, "Alexey", 2}
+        };
+
+        verifyJoin(left, EMPTY, INNER, EMPTY, joinAlgo());
+        verifyJoin(left, EMPTY, LEFT, new Object[][]{
+                {1, "Roman", null},
+                {2, "Igor", null},
+                {3, "Alexey", null}
+        }, joinAlgo());
+        verifyJoin(left, EMPTY, RIGHT, EMPTY, joinAlgo());
+        verifyJoin(left, EMPTY, FULL, new Object[][]{
+                {1, "Roman", null},
+                {2, "Igor", null},
+                {3, "Alexey", null}
+        }, joinAlgo());
+        verifyJoin(left, EMPTY, SEMI, EMPTY, joinAlgo());
+        verifyJoin(left, EMPTY, ANTI, new Object[][]{
+                {1, "Roman"},
+                {2, "Igor"},
+                {3, "Alexey"}
+        }, joinAlgo());
+    }
+
+    @Test
+    public void joinOneToMany() {
+        Object[][] left = {
+                {1, "Roman", null},
+                {2, "Igor", 1},
+                {3, "Alexey", 2}
+        };
+
+        Object[][] right = {
+                {1, "Core"},
+                {1, "OLD_Core"},
+                {2, "SQL"},
+                {3, "Arch"}
+        };
+
+        verifyJoin(left, right, INNER, new Object[][]{
+                {2, "Igor", "Core"},
+                {2, "Igor", "OLD_Core"},
+                {3, "Alexey", "SQL"}
+        }, joinAlgo());
+        verifyJoin(left, right, LEFT, new Object[][]{
+                {1, "Roman", null},
+                {2, "Igor", "Core"},
+                {2, "Igor", "OLD_Core"},
+                {3, "Alexey", "SQL"}
+        }, joinAlgo());
+        verifyJoin(left, right, RIGHT, new Object[][]{
+                {2, "Igor", "Core"},
+                {2, "Igor", "OLD_Core"},
+                {3, "Alexey", "SQL"},
+                {null, null, "Arch"}
+        }, joinAlgo());
+        verifyJoin(left, right, FULL, new Object[][]{
+                {1, "Roman", null},
+                {2, "Igor", "Core"},
+                {2, "Igor", "OLD_Core"},
+                {3, "Alexey", "SQL"},
+                {null, null, "Arch"}
+        }, joinAlgo());
+        verifyJoin(left, right, SEMI, new Object[][]{
+                {2, "Igor"},
+                {3, "Alexey"}
+        }, joinAlgo());
+        verifyJoin(left, right, ANTI, new Object[][]{
+                {1, "Roman"}
+        }, joinAlgo());
+    }
+
+    @Test
+    public void testLeftJoin() {
+        //    select e.id, e.name, d.name as dep_name
+        //      from emp e
+        // left join dep d
+        //        on e.depno = d.depno
+
+        Object[][] persons = {
+                new Object[]{0, "Igor", 1},
+                new Object[]{1, "Roman", 2},
+                new Object[]{2, "Ivan", null},
+                new Object[]{3, "Alexey", 1}
+        };
+
+        Object[][] deps = {
+                new Object[]{1, "Core"},
+                new Object[]{2, "SQL"}
+        };
+
+        verifyJoin(persons, deps, LEFT, new Object[][]{
+                {0, "Igor", "Core"},
+                {1, "Roman", "SQL"},
+                {2, "Ivan", null},
+                {3, "Alexey", "Core"},
+        }, joinAlgo());
+    }
+
+    @Test
+    public void testSemiJoin() {
+        //    select d.name as dep_name
+        //      from dep d
+        // semi join emp e
+        //        on e.depno = d.depno
+
+        Object[][] persons = {
+                new Object[]{1, "Igor", 0},
+                new Object[]{2, "Roman", 1},
+                new Object[]{null, "Ivan", 2},
+                new Object[]{1, "Alexey", 3}
+        };
+
+        Object[][] deps = {
+                new Object[]{1, "Core", 1},
+                new Object[]{2, "SQL", 2},
+                new Object[]{3, "QA", 3}
+        };
+
+        verifyJoin(deps, persons, SEMI, new Object[][]{
+                {1, "Core"},
+                {2, "SQL"},
+        }, joinAlgo());
+    }
+
+    @Test
+    public void testAntiJoin() {
+        //    select d.name as dep_name
+        //      from dep d
+        // anti join emp e
+        //        on e.depno = d.depno
+
+        Object[][] persons = {
+                new Object[]{1, "Igor", },
+                new Object[]{2, "Roman", 1},
+                new Object[]{null, "Ivan", 2},
+                new Object[]{1, "Alexey", 3}
+        };
+
+        Object[][] deps = {
+                new Object[]{1, "Core", 1},
+                new Object[]{2, "SQL", 2},
+                new Object[]{3, "QA", 3}
+        };
+
+        verifyJoin(deps, persons, ANTI, new Object[][]{
+                {3, "QA"}
+        }, joinAlgo());
+    }
+
+    @Test
+    public void joinOneToMany2() {
+        Object[][] left = {
+                {1, "Roman", null},
+                {2, "Igor", 1},
+                {3, "Alexey", 2},
+                {4, "Ivan", 4},
+                {5, "Taras", 5},
+                {6, "Lisa", 6}
+        };
+
+        Object[][] right = {
+                {1, "Core"},
+                {1, "OLD_Core"},
+                {2, "SQL"},
+                {3, "QA"},
+                {5, "Arch"}
+        };
+
+        verifyJoin(left, right, INNER, new Object[][]{
+                {2, "Igor", "Core"},
+                {2, "Igor", "OLD_Core"},
+                {3, "Alexey", "SQL"},
+                {5, "Taras", "Arch"}
+        }, joinAlgo());
+        verifyJoin(left, right, LEFT, new Object[][]{
+                {1, "Roman", null},
+                {2, "Igor", "Core"},
+                {2, "Igor", "OLD_Core"},
+                {3, "Alexey", "SQL"},
+                {4, "Ivan", null},
+                {5, "Taras", "Arch"},
+                {6, "Lisa", null}
+        }, joinAlgo());
+        verifyJoin(left, right, RIGHT, new Object[][]{
+                {2, "Igor", "Core"},
+                {2, "Igor", "OLD_Core"},
+                {3, "Alexey", "SQL"},
+                {5, "Taras", "Arch"},
+                {null, null, "QA"}
+        }, joinAlgo());
+        verifyJoin(left, right, FULL, new Object[][]{
+                {1, "Roman", null},
+                {2, "Igor", "Core"},
+                {2, "Igor", "OLD_Core"},
+                {3, "Alexey", "SQL"},
+                {4, "Ivan", null},
+                {5, "Taras", "Arch"},
+                {6, "Lisa", null},
+                {null, null, "QA"}
+        }, joinAlgo());
+        verifyJoin(left, right, SEMI, new Object[][]{
+                {2, "Igor"},
+                {3, "Alexey"},
+                {5, "Taras"}
+        }, joinAlgo());
+        verifyJoin(left, right, ANTI, new Object[][]{
+                {1, "Roman"},
+                {4, "Ivan"},
+                {6, "Lisa"}
+        }, joinAlgo());
+    }
+
+    @Test
+    public void joinManyToMany() {
+        Object[][] left = {
+                {1, "Roman", null},
+                {2, "Igor", 1},
+                {3, "Taras", 1},
+                {4, "Alexey", 2},
+                {5, "Ivan", 4},
+                {6, "Andrey", 4}
+        };
+
+        Object[][] right = {
+                {1, "Core"},
+                {1, "OLD_Core"},
+                {2, "SQL"},
+                {3, "Arch"},
+                {4, "QA"},
+                {4, "OLD_QA"},
+        };
+
+        verifyJoin(left, right, INNER, new Object[][]{
+                {2, "Igor", "Core"},
+                {2, "Igor", "OLD_Core"},
+                {3, "Taras", "Core"},
+                {3, "Taras", "OLD_Core"},
+                {4, "Alexey", "SQL"},
+                {5, "Ivan", "OLD_QA"},
+                {5, "Ivan", "QA"},
+                {6, "Andrey", "OLD_QA"},
+                {6, "Andrey", "QA"}
+        }, joinAlgo());
+        verifyJoin(left, right, LEFT, new Object[][]{
+                {1, "Roman", null},
+                {2, "Igor", "Core"},
+                {2, "Igor", "OLD_Core"},
+                {3, "Taras", "Core"},
+                {3, "Taras", "OLD_Core"},
+                {4, "Alexey", "SQL"},
+                {5, "Ivan", "OLD_QA"},
+                {5, "Ivan", "QA"},
+                {6, "Andrey", "OLD_QA"},
+                {6, "Andrey", "QA"}
+        }, joinAlgo());
+        verifyJoin(left, right, RIGHT, new Object[][]{
+                {2, "Igor", "Core"},
+                {2, "Igor", "OLD_Core"},
+                {3, "Taras", "Core"},
+                {3, "Taras", "OLD_Core"},
+                {4, "Alexey", "SQL"},
+                {5, "Ivan", "OLD_QA"},
+                {5, "Ivan", "QA"},
+                {6, "Andrey", "OLD_QA"},
+                {6, "Andrey", "QA"},
+                {null, null, "Arch"}
+        }, joinAlgo());
+        verifyJoin(left, right, FULL, new Object[][]{
+                {1, "Roman", null},
+                {2, "Igor", "Core"},
+                {2, "Igor", "OLD_Core"},
+                {3, "Taras", "Core"},
+                {3, "Taras", "OLD_Core"},
+                {4, "Alexey", "SQL"},
+                {5, "Ivan", "OLD_QA"},
+                {5, "Ivan", "QA"},
+                {6, "Andrey", "OLD_QA"},
+                {6, "Andrey", "QA"},
+                {null, null, "Arch"}
+        }, joinAlgo());
+        verifyJoin(left, right, SEMI, new Object[][]{
+                {2, "Igor"},
+                {3, "Taras"},
+                {4, "Alexey"},
+                {5, "Ivan"},
+                {6, "Andrey"},
+        }, joinAlgo());
+        verifyJoin(left, right, ANTI, new Object[][]{
+                {1, "Roman"}
+        }, joinAlgo());
+    }
+
+    enum JoinAlgo {
+        HASH,
+        NESTED_LOOP
+    }
+
+    /**
+     * Creates execution tree and executes it. Then compares the result of the execution with the given one.
+     *
+     * @param left     Data for left table.
+     * @param right    Data for right table.
+     * @param joinType Join type.
+     * @param expRes   Expected result.
+     */
+    private void verifyJoin(Object[][] left, Object[][] right, JoinRelType joinType, Object[][] expRes, JoinAlgo algo) {
+        ExecutionContext<Object[]> ctx = executionContext(true);
+
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+
+        RelDataType leftType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
+                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
+        ScanNode<Object[]> leftNode = new ScanNode<>(ctx, Arrays.asList(left));
+
+        RelDataType rightType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
+        ScanNode<Object[]> rightNode = new ScanNode<>(ctx, Arrays.asList(right));
+
+        RelDataType outType;
+        if (setOf(SEMI, ANTI).contains(joinType)) {
+            outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
+                    NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
+        } else {
+            outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
+                    NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32, NativeTypes.INT32, NativeTypes.STRING));
+        }
+
+        RowHandler<Object[]> hnd = ctx.rowHandler();
+
+        AbstractRightMaterializedJoinNode<Object[]> join;
+
+        if (algo == JoinAlgo.NESTED_LOOP) {
+            join = NestedLoopJoinNode.create(ctx, outType, leftType, rightType, joinType,
+                    (r1, r2) -> getFieldFromBiRows(hnd, 2, r1, r2) == getFieldFromBiRows(hnd, 3, r1, r2));
+        } else {
+            join = HashJoinNode.create(ctx, outType, leftType, rightType, joinType,
+                    JoinInfo.of(ImmutableIntList.of(2), ImmutableIntList.of(0)));
+        }
+
+        join.register(asList(leftNode, rightNode));
+
+        ProjectNode<Object[]> project;
+        SortNode<Object[]> sortNode;
+        if (setOf(SEMI, ANTI).contains(joinType)) {
+            project = new ProjectNode<>(ctx, r -> new Object[]{r[0], r[1]});
+            RelCollation collation = RelCollations.of(ImmutableIntList.of(0, 1));
+            Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
+
+            sortNode = new SortNode<>(ctx, cmp);
+        } else {
+            project = new ProjectNode<>(ctx, r -> new Object[]{r[0], r[1], r[4]});
+            RelCollation collation = RelCollations.of(ImmutableIntList.of(0, 1, 4));
+            Comparator<Object[]> cmp = ctx.expressionFactory().comparator(collation);
+
+            sortNode = new SortNode<>(ctx, cmp);
+        }
+
+        sortNode.register(join);
+
+        project.register(sortNode);
+
+        // first, let's rewind just created tree -- it's how it actually being executed
+        project.rewind();
+
+        int times = 2;
+        do {
+            TestDownstream<Object[]> downstream = new TestDownstream<>();
+            project.onRegister(downstream);
+
+            ctx.execute(() -> project.request(1024), project::onError);
+
+            Object[][] res = await(downstream.result()).toArray(EMPTY);
+
+            assertArrayEquals(expRes, res);
+
+            // now let's rewind and restart test to check whether all state has been correctly reset
+            project.rewind();
+        } while (times-- > 0);
+    }
+
+    /**
+     * Creates {@link Set set} from provided items.
+     *
+     * @param items Items.
+     * @return New set.
+     */
+    @SafeVarargs
+    private static <T> Set<T> setOf(T... items) {
+        return new HashSet<>(Arrays.asList(items));
+    }
+
+    @Override
+    protected RowHandler<Object[]> rowHandler() {
+        return ArrayRowHandler.INSTANCE;
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
index 162b8b0..24bc4fc 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
@@ -19,12 +19,9 @@
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
-import static org.apache.calcite.rel.core.JoinRelType.ANTI;
 import static org.apache.calcite.rel.core.JoinRelType.FULL;
 import static org.apache.calcite.rel.core.JoinRelType.INNER;
 import static org.apache.calcite.rel.core.JoinRelType.LEFT;
-import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
-import static org.apache.calcite.rel.core.JoinRelType.SEMI;
 import static org.apache.ignite.internal.sql.engine.util.Commons.getFieldFromBiRows;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.internal.util.ArrayUtils.asList;
@@ -223,64 +220,6 @@
     }
 
     @Test
-    public void testRightJoin() {
-        //     select e.id, e.name, d.name as dep_name
-        //       from dep d
-        // right join emp e
-        //         on e.depno = d.depno
-
-        ExecutionContext<Object[]> ctx = executionContext(true);
-
-        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
-                new Object[]{0, "Igor", 1},
-                new Object[]{1, "Roman", 2},
-                new Object[]{2, "Ivan", null},
-                new Object[]{3, "Alexey", 1}
-        ));
-
-        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
-                new Object[]{1, "Core"},
-                new Object[]{2, "SQL"},
-                new Object[]{3, "QA"}
-        ));
-
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-
-        RelDataType outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
-                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
-        RelDataType leftType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
-        RelDataType rightType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
-                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
-
-        RowHandler<Object[]> hnd = ctx.rowHandler();
-
-        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, outType, leftType, rightType, RIGHT,
-                (r1, r2) -> getFieldFromBiRows(hnd, 0, r1, r2) == getFieldFromBiRows(hnd, 4, r1, r2));
-        join.register(asList(deps, persons));
-
-        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[2], r[3], r[1]});
-        project.register(join);
-
-        RootNode<Object[]> node = new RootNode<>(ctx);
-        node.register(project);
-
-        assert node.hasNext();
-
-        ArrayList<Object[]> rows = new ArrayList<>();
-
-        while (node.hasNext()) {
-            rows.add(node.next());
-        }
-
-        assertEquals(4, rows.size());
-
-        assertArrayEquals(new Object[]{0, "Igor", "Core"}, rows.get(0));
-        assertArrayEquals(new Object[]{3, "Alexey", "Core"}, rows.get(1));
-        assertArrayEquals(new Object[]{1, "Roman", "SQL"}, rows.get(2));
-        assertArrayEquals(new Object[]{2, "Ivan", null}, rows.get(3));
-    }
-
-    @Test
     public void testFullOuterJoin() {
         //          select e.id, e.name, d.name as dep_name
         //            from emp e
@@ -339,117 +278,6 @@
         assertArrayEquals(new Object[]{null, null, "QA"}, rows.get(4));
     }
 
-    @Test
-    public void testSemiJoin() {
-        //    select d.name as dep_name
-        //      from dep d
-        // semi join emp e
-        //        on e.depno = d.depno
-
-        ExecutionContext<Object[]> ctx = executionContext(true);
-
-        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
-                new Object[]{0, "Igor", 1},
-                new Object[]{1, "Roman", 2},
-                new Object[]{2, "Ivan", null},
-                new Object[]{3, "Alexey", 1}
-        ));
-
-        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
-                new Object[]{1, "Core"},
-                new Object[]{2, "SQL"},
-                new Object[]{3, "QA"}
-        ));
-
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-
-        RelDataType outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
-                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
-        RelDataType leftType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
-                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
-        RelDataType rightType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
-
-        RowHandler<Object[]> hnd = ctx.rowHandler();
-
-        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, outType, leftType, rightType, SEMI,
-                (r1, r2) -> getFieldFromBiRows(hnd, 0, r1, r2) == getFieldFromBiRows(hnd, 4, r1, r2));
-        join.register(asList(deps, persons));
-
-        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[1]});
-        project.register(join);
-
-        RootNode<Object[]> node = new RootNode<>(ctx);
-        node.register(project);
-
-        assert node.hasNext();
-
-        ArrayList<Object[]> rows = new ArrayList<>();
-
-        while (node.hasNext()) {
-            rows.add(node.next());
-        }
-
-        assertEquals(2, rows.size());
-
-        assertArrayEquals(new Object[]{"Core"}, rows.get(0));
-        assertArrayEquals(new Object[]{"SQL"}, rows.get(1));
-    }
-
-    @Test
-    public void testAntiJoin() {
-        //    select d.name as dep_name
-        //      from dep d
-        // anti join emp e
-        //        on e.depno = d.depno
-
-        ExecutionContext<Object[]> ctx = executionContext(true);
-
-        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
-                new Object[]{0, "Igor", 1},
-                new Object[]{1, "Roman", 2},
-                new Object[]{2, "Ivan", null},
-                new Object[]{3, "Alexey", 1}
-        ));
-
-        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
-                new Object[]{1, "Core"},
-                new Object[]{2, "SQL"},
-                new Object[]{3, "QA"}
-        ));
-
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-
-        RelDataType outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
-                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
-        RelDataType leftType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
-                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
-        RelDataType rightType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
-
-        RowHandler<Object[]> hnd = ctx.rowHandler();
-
-        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, outType, leftType, rightType, ANTI,
-                (r1, r2) -> getFieldFromBiRows(hnd, 0, r1, r2) == getFieldFromBiRows(hnd, 4, r1, r2));
-        join.register(asList(deps, persons));
-
-        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[1]});
-        project.register(join);
-
-        RootNode<Object[]> node = new RootNode<>(ctx);
-        node.register(project);
-
-        assert node.hasNext();
-
-        ArrayList<Object[]> rows = new ArrayList<>();
-
-        while (node.hasNext()) {
-            rows.add(node.next());
-        }
-
-        assertEquals(1, rows.size());
-
-        assertArrayEquals(new Object[]{"QA"}, rows.get(0));
-    }
-
     /**
      * TestCorrelatedNestedLoopJoin.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java
new file mode 100644
index 0000000..3412ce7
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.junit.jupiter.api.Test;
+
+/** Yash join execution tests. */
+public class HashJoinExecutionTest extends AbstractJoinExecutionTest {
+    @Override
+    JoinAlgo joinAlgo() {
+        return JoinAlgo.HASH;
+    }
+
+    @Test
+    public void testHashJoinRewind() {
+        ExecutionContext<Object[]> ctx = executionContext(true);
+
+        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+                new Object[]{0, "Igor", 1},
+                new Object[]{1, "Roman", 2},
+                new Object[]{2, "Ivan", 5},
+                new Object[]{3, "Alexey", 1}
+        ));
+
+        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+                new Object[]{1, "Core"},
+                new Object[]{2, "SQL"},
+                new Object[]{3, "QA"}
+        ));
+
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+
+        RelDataType outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
+                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
+        RelDataType leftType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
+        RelDataType rightType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
+                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
+
+        AbstractRightMaterializedJoinNode<Object[]> join = HashJoinNode.create(ctx, outType, leftType, rightType, RIGHT,
+                JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(2)));
+
+        join.register(asList(deps, persons));
+
+        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[2], r[3], r[1]});
+        project.register(join);
+
+        RootRewindable<Object[]> node = new RootRewindable<>(ctx);
+        node.register(project);
+
+        assert node.hasNext();
+
+        ArrayList<Object[]> rows = new ArrayList<>();
+
+        while (node.hasNext()) {
+            rows.add(node.next());
+        }
+
+        assertEquals(4, rows.size());
+
+        Object[][] expected = {
+                {0, "Igor", "Core"},
+                {3, "Alexey", "Core"},
+                {1, "Roman", "SQL"},
+                {2, "Ivan", null}
+        };
+
+        assert2DimArrayEquals(expected, rows);
+
+        List<Object[]> depsRes = new ArrayList<>();
+        depsRes.add(new Object[]{5, "QA"});
+
+        deps = new ScanNode<>(ctx, depsRes);
+
+        join.register(asList(deps, persons));
+
+        node.rewind();
+
+        assert node.hasNext();
+
+        ArrayList<Object[]> rowsAfterRewind = new ArrayList<>();
+
+        while (node.hasNext()) {
+            rowsAfterRewind.add(node.next());
+        }
+
+        assertEquals(4, rowsAfterRewind.size());
+
+        Object[][] expectedAfterRewind = {
+                {2, "Ivan", "QA"},
+                {1, "Roman", null},
+                {0, "Igor", null},
+                {3, "Alexey", null},
+        };
+
+        assert2DimArrayEquals(expectedAfterRewind, rowsAfterRewind);
+    }
+
+    static void assert2DimArrayEquals(Object[][] expected, ArrayList<Object[]> actual) {
+        assertEquals(expected.length, actual.size(), "expected length: " + expected.length + ", actual length: " + actual.size());
+
+        int length = expected.length;
+
+        for (int i = 0; i < length; ++i) {
+            Object[] exp = expected[i];
+            Object[] act = actual.get(i);
+
+            assertEquals(exp.length, act.length, "expected length: " + exp.length + ", actual length: " + act.length);
+            assertArrayEquals(exp, act);
+        }
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
index d66af19..e5c97dd 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
@@ -17,362 +17,10 @@
 
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
-import static org.apache.calcite.rel.core.JoinRelType.ANTI;
-import static org.apache.calcite.rel.core.JoinRelType.FULL;
-import static org.apache.calcite.rel.core.JoinRelType.INNER;
-import static org.apache.calcite.rel.core.JoinRelType.LEFT;
-import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
-import static org.apache.calcite.rel.core.JoinRelType.SEMI;
-import static org.apache.ignite.internal.sql.engine.util.Commons.getFieldFromBiRows;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static org.apache.ignite.internal.util.ArrayUtils.asList;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
-import org.apache.ignite.internal.sql.engine.exec.RowHandler;
-import org.apache.ignite.internal.sql.engine.exec.TestDownstream;
-import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
-import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
-import org.apache.ignite.internal.sql.engine.util.TypeUtils;
-import org.apache.ignite.internal.type.NativeTypes;
-import org.junit.jupiter.api.Test;
-
-/**
- * NestedLoopJoinExecutionTest.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
- */
-public class NestedLoopJoinExecutionTest extends AbstractExecutionTest<Object[]> {
-    @Test
-    public void joinEmptyTables() {
-        verifyJoin(EMPTY, EMPTY, INNER, EMPTY);
-        verifyJoin(EMPTY, EMPTY, LEFT, EMPTY);
-        verifyJoin(EMPTY, EMPTY, RIGHT, EMPTY);
-        verifyJoin(EMPTY, EMPTY, FULL, EMPTY);
-        verifyJoin(EMPTY, EMPTY, SEMI, EMPTY);
-        verifyJoin(EMPTY, EMPTY, ANTI, EMPTY);
-    }
-
-    @Test
-    public void joinEmptyLeftTable() {
-        Object[][] right = {
-                {1, "Core"},
-                {1, "OLD_Core"},
-                {2, "SQL"}
-        };
-
-        verifyJoin(EMPTY, right, INNER, EMPTY);
-        verifyJoin(EMPTY, right, LEFT, EMPTY);
-        verifyJoin(EMPTY, right, RIGHT, new Object[][]{
-                {null, null, "Core"},
-                {null, null, "OLD_Core"},
-                {null, null, "SQL"}
-        });
-        verifyJoin(EMPTY, right, FULL, new Object[][]{
-                {null, null, "Core"},
-                {null, null, "OLD_Core"},
-                {null, null, "SQL"}
-        });
-        verifyJoin(EMPTY, right, SEMI, EMPTY);
-        verifyJoin(EMPTY, right, ANTI, EMPTY);
-    }
-
-    @Test
-    public void joinEmptyRightTable() {
-        Object[][] left = {
-                {1, "Roman", null},
-                {2, "Igor", 1},
-                {3, "Alexey", 2}
-        };
-
-        verifyJoin(left, EMPTY, INNER, EMPTY);
-        verifyJoin(left, EMPTY, LEFT, new Object[][]{
-                {1, "Roman", null},
-                {2, "Igor", null},
-                {3, "Alexey", null}
-        });
-        verifyJoin(left, EMPTY, RIGHT, EMPTY);
-        verifyJoin(left, EMPTY, FULL, new Object[][]{
-                {1, "Roman", null},
-                {2, "Igor", null},
-                {3, "Alexey", null}
-        });
-        verifyJoin(left, EMPTY, SEMI, EMPTY);
-        verifyJoin(left, EMPTY, ANTI, new Object[][]{
-                {1, "Roman"},
-                {2, "Igor"},
-                {3, "Alexey"}
-        });
-    }
-
-    @Test
-    public void joinOneToMany() {
-        Object[][] left = {
-                {1, "Roman", null},
-                {2, "Igor", 1},
-                {3, "Alexey", 2}
-        };
-
-        Object[][] right = {
-                {1, "Core"},
-                {1, "OLD_Core"},
-                {2, "SQL"},
-                {3, "Arch"}
-        };
-
-        verifyJoin(left, right, INNER, new Object[][]{
-                {2, "Igor", "Core"},
-                {2, "Igor", "OLD_Core"},
-                {3, "Alexey", "SQL"}
-        });
-        verifyJoin(left, right, LEFT, new Object[][]{
-                {1, "Roman", null},
-                {2, "Igor", "Core"},
-                {2, "Igor", "OLD_Core"},
-                {3, "Alexey", "SQL"}
-        });
-        verifyJoin(left, right, RIGHT, new Object[][]{
-                {2, "Igor", "Core"},
-                {2, "Igor", "OLD_Core"},
-                {3, "Alexey", "SQL"},
-                {null, null, "Arch"}
-        });
-        verifyJoin(left, right, FULL, new Object[][]{
-                {1, "Roman", null},
-                {2, "Igor", "Core"},
-                {2, "Igor", "OLD_Core"},
-                {3, "Alexey", "SQL"},
-                {null, null, "Arch"}
-        });
-        verifyJoin(left, right, SEMI, new Object[][]{
-                {2, "Igor"},
-                {3, "Alexey"}
-        });
-        verifyJoin(left, right, ANTI, new Object[][]{
-                {1, "Roman"}
-        });
-    }
-
-    @Test
-    public void joinOneToMany2() {
-        Object[][] left = {
-                {1, "Roman", null},
-                {2, "Igor", 1},
-                {3, "Alexey", 2},
-                {4, "Ivan", 4},
-                {5, "Taras", 5},
-                {6, "Lisa", 6}
-        };
-
-        Object[][] right = {
-                {1, "Core"},
-                {1, "OLD_Core"},
-                {2, "SQL"},
-                {3, "QA"},
-                {5, "Arch"}
-        };
-
-        verifyJoin(left, right, INNER, new Object[][]{
-                {2, "Igor", "Core"},
-                {2, "Igor", "OLD_Core"},
-                {3, "Alexey", "SQL"},
-                {5, "Taras", "Arch"}
-        });
-        verifyJoin(left, right, LEFT, new Object[][]{
-                {1, "Roman", null},
-                {2, "Igor", "Core"},
-                {2, "Igor", "OLD_Core"},
-                {3, "Alexey", "SQL"},
-                {4, "Ivan", null},
-                {5, "Taras", "Arch"},
-                {6, "Lisa", null}
-        });
-        verifyJoin(left, right, RIGHT, new Object[][]{
-                {2, "Igor", "Core"},
-                {2, "Igor", "OLD_Core"},
-                {3, "Alexey", "SQL"},
-                {5, "Taras", "Arch"},
-                {null, null, "QA"}
-        });
-        verifyJoin(left, right, FULL, new Object[][]{
-                {1, "Roman", null},
-                {2, "Igor", "Core"},
-                {2, "Igor", "OLD_Core"},
-                {3, "Alexey", "SQL"},
-                {4, "Ivan", null},
-                {5, "Taras", "Arch"},
-                {6, "Lisa", null},
-                {null, null, "QA"}
-        });
-        verifyJoin(left, right, SEMI, new Object[][]{
-                {2, "Igor"},
-                {3, "Alexey"},
-                {5, "Taras"}
-        });
-        verifyJoin(left, right, ANTI, new Object[][]{
-                {1, "Roman"},
-                {4, "Ivan"},
-                {6, "Lisa"}
-        });
-    }
-
-    @Test
-    public void joinManyToMany() {
-        Object[][] left = {
-                {1, "Roman", null},
-                {2, "Igor", 1},
-                {3, "Taras", 1},
-                {4, "Alexey", 2},
-                {5, "Ivan", 4},
-                {6, "Andrey", 4}
-        };
-
-        Object[][] right = {
-                {1, "Core"},
-                {1, "OLD_Core"},
-                {2, "SQL"},
-                {3, "Arch"},
-                {4, "QA"},
-                {4, "OLD_QA"},
-        };
-
-        verifyJoin(left, right, INNER, new Object[][]{
-                {2, "Igor", "Core"},
-                {2, "Igor", "OLD_Core"},
-                {3, "Taras", "Core"},
-                {3, "Taras", "OLD_Core"},
-                {4, "Alexey", "SQL"},
-                {5, "Ivan", "QA"},
-                {5, "Ivan", "OLD_QA"},
-                {6, "Andrey", "QA"},
-                {6, "Andrey", "OLD_QA"}
-        });
-        verifyJoin(left, right, LEFT, new Object[][]{
-                {1, "Roman", null},
-                {2, "Igor", "Core"},
-                {2, "Igor", "OLD_Core"},
-                {3, "Taras", "Core"},
-                {3, "Taras", "OLD_Core"},
-                {4, "Alexey", "SQL"},
-                {5, "Ivan", "QA"},
-                {5, "Ivan", "OLD_QA"},
-                {6, "Andrey", "QA"},
-                {6, "Andrey", "OLD_QA"}
-        });
-        verifyJoin(left, right, RIGHT, new Object[][]{
-                {2, "Igor", "Core"},
-                {2, "Igor", "OLD_Core"},
-                {3, "Taras", "Core"},
-                {3, "Taras", "OLD_Core"},
-                {4, "Alexey", "SQL"},
-                {5, "Ivan", "QA"},
-                {5, "Ivan", "OLD_QA"},
-                {6, "Andrey", "QA"},
-                {6, "Andrey", "OLD_QA"},
-                {null, null, "Arch"}
-        });
-        verifyJoin(left, right, FULL, new Object[][]{
-                {1, "Roman", null},
-                {2, "Igor", "Core"},
-                {2, "Igor", "OLD_Core"},
-                {3, "Taras", "Core"},
-                {3, "Taras", "OLD_Core"},
-                {4, "Alexey", "SQL"},
-                {5, "Ivan", "QA"},
-                {5, "Ivan", "OLD_QA"},
-                {6, "Andrey", "QA"},
-                {6, "Andrey", "OLD_QA"},
-                {null, null, "Arch"}
-        });
-        verifyJoin(left, right, SEMI, new Object[][]{
-                {2, "Igor"},
-                {3, "Taras"},
-                {4, "Alexey"},
-                {5, "Ivan"},
-                {6, "Andrey"},
-        });
-        verifyJoin(left, right, ANTI, new Object[][]{
-                {1, "Roman"}
-        });
-    }
-
-    /**
-     * Creates execution tree and executes it. Then compares the result of the execution with the given one.
-     *
-     * @param left     Data for left table.
-     * @param right    Data for right table.
-     * @param joinType Join type.
-     * @param expRes   Expected result.
-     */
-    private void verifyJoin(Object[][] left, Object[][] right, JoinRelType joinType, Object[][] expRes) {
-        ExecutionContext<Object[]> ctx = executionContext(true);
-
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-
-        RelDataType leftType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
-                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
-        ScanNode<Object[]> leftNode = new ScanNode<>(ctx, Arrays.asList(left));
-
-        RelDataType rightType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
-        ScanNode<Object[]> rightNode = new ScanNode<>(ctx, Arrays.asList(right));
-
-        RelDataType outType;
-        if (setOf(SEMI, ANTI).contains(joinType)) {
-            outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
-                    NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
-        } else {
-            outType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf,
-                    NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32, NativeTypes.INT32, NativeTypes.STRING));
-        }
-
-        RowHandler<Object[]> hnd = ctx.rowHandler();
-
-        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, outType, leftType, rightType, joinType,
-                (r1, r2) -> getFieldFromBiRows(hnd, 2, r1, r2) == getFieldFromBiRows(hnd, 3, r1, r2));
-        join.register(asList(leftNode, rightNode));
-
-        ProjectNode<Object[]> project;
-        if (setOf(SEMI, ANTI).contains(joinType)) {
-            project = new ProjectNode<>(ctx, r -> new Object[]{r[0], r[1]});
-        } else {
-            project = new ProjectNode<>(ctx, r -> new Object[]{r[0], r[1], r[4]});
-        }
-        project.register(join);
-
-        // first, let's rewind just created tree -- it's how it actually being executed
-        project.rewind();
-
-        int times = 2;
-        do {
-            TestDownstream<Object[]> downstream = new TestDownstream<>();
-            project.onRegister(downstream);
-
-            ctx.execute(() -> project.request(1024), project::onError);
-
-            assertArrayEquals(expRes, await(downstream.result()).toArray(EMPTY));
-
-            // now let's rewind and restart test to check whether all state has been correctly reset
-            project.rewind();
-        } while (times-- > 0);
-    }
-
-    /**
-     * Creates {@link Set set} from provided items.
-     *
-     * @param items Items.
-     * @return New set.
-     */
-    @SafeVarargs
-    private static <T> Set<T> setOf(T... items) {
-        return new HashSet<>(Arrays.asList(items));
-    }
-
+/** Nested loop join execution tests. */
+public class NestedLoopJoinExecutionTest extends AbstractJoinExecutionTest {
     @Override
-    protected RowHandler<Object[]> rowHandler() {
-        return ArrayRowHandler.INSTANCE;
+    JoinAlgo joinAlgo() {
+        return JoinAlgo.NESTED_LOOP;
     }
 }
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
index ded0e04..5a53898 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
@@ -270,7 +270,8 @@
      */
     @Test
     public void noSortAppendingWithCorrectCollation() throws Exception {
-        String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule"};
+        String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule",
+                "HashJoinConverter"};
 
         assertPlan(TestCase.CASE_16,
                 not(nodeOrAnyChild(isInstanceOf(IgniteSort.class)))
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
index fec1474..2d7e800 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
@@ -251,7 +251,8 @@
      */
     @Test
     public void noSortAppendingWithCorrectCollation() throws Exception {
-        String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule"};
+        String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule",
+                "HashJoinConverter"};
 
         assertPlan(TestCase.CASE_16,
                 nodeOrAnyChild(isInstanceOf(IgniteSort.class)
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
index 8953888..eb6c589 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
@@ -258,7 +258,8 @@
      */
     @Test
     public void noSortAppendingWithCorrectCollation() throws Exception {
-        String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule"};
+        String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule",
+                "HashJoinConverter"};
 
         assertPlan(TestCase.CASE_16,
                 not(nodeOrAnyChild(isInstanceOf(IgniteSort.class)))
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
index 5f5474a..bfd8e97 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
@@ -61,7 +61,7 @@
         IgniteRel phys = physicalPlan(
                 sql,
                 publicSchema,
-                "MergeJoinConverter", "NestedLoopJoinConverter"
+                "MergeJoinConverter", "NestedLoopJoinConverter", "HashJoinConverter"
         );
 
         System.out.println("+++ " + RelOptUtil.toString(phys));
@@ -98,7 +98,7 @@
                 sql,
                 publicSchema,
                 Objects::nonNull,
-                "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule"
+                "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule", "HashJoinConverter"
         );
     }
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexPlannerTest.java
index 59dea6c..a9820d6 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexPlannerTest.java
@@ -116,7 +116,7 @@
 
         String sql = "SELECT l.*, r.* FROM left_tbl l JOIN right_tbl r ON l.val0 = r.val0 AND l.val1 = r.val1";
 
-        RelNode phys = physicalPlan(sql, schema, "MergeJoinConverter", "NestedLoopJoinConverter");
+        RelNode phys = physicalPlan(sql, schema, "MergeJoinConverter", "NestedLoopJoinConverter", "HashJoinConverter");
 
         IgniteIndexScan scan = findFirstNode(phys, byClass(IgniteIndexScan.class));
 
@@ -136,7 +136,7 @@
 
         String sql = "SELECT l.id FROM left_tbl l JOIN right_tbl r ON l.val0 = r.val0";
 
-        RelNode phys = physicalPlan(sql, schema, "MergeJoinConverter", "NestedLoopJoinConverter");
+        RelNode phys = physicalPlan(sql, schema, "MergeJoinConverter", "NestedLoopJoinConverter", "HashJoinConverter");
 
         IgniteIndexScan scan = findFirstNode(phys, byClass(IgniteIndexScan.class));
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java
index e49c9a5..621977e 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java
@@ -58,7 +58,7 @@
         IgniteRel phys = physicalPlan(
                 sql,
                 publicSchema,
-                "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToSortedIndexSpoolRule"
+                "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToSortedIndexSpoolRule", "HashJoinConverter"
         );
 
         System.out.println("+++\n" + RelOptUtil.toString(phys));
@@ -89,7 +89,7 @@
         IgniteRel phys = physicalPlan(
                 sql,
                 publicSchema,
-                "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToSortedIndexSpoolRule"
+                "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToSortedIndexSpoolRule", "HashJoinConverter"
         );
 
         IgniteHashIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteHashIndexSpool.class));
@@ -122,7 +122,7 @@
         IgniteRel phys = physicalPlan(
                 sql,
                 publicSchema,
-                "MergeJoinConverter", "NestedLoopJoinConverter"
+                "MergeJoinConverter", "NestedLoopJoinConverter", "HashJoinConverter"
         );
 
         IgniteHashIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteHashIndexSpool.class));
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashJoinPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashJoinPlannerTest.java
new file mode 100644
index 0000000..e602e9c
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashJoinPlannerTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.planner;
+
+import static org.apache.ignite.internal.sql.engine.planner.CorrelatedSubqueryPlannerTest.createTestTable;
+import static org.apache.ignite.internal.sql.engine.planner.JoinColocationPlannerTest.complexTbl;
+import static org.apache.ignite.internal.sql.engine.planner.JoinColocationPlannerTest.simpleTable;
+import static org.apache.ignite.internal.sql.engine.planner.JoinColocationPlannerTest.simpleTableHashPk;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.List;
+import java.util.stream.Stream;
+import org.apache.calcite.plan.RelOptPlanner.CannotPlanException;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.ignite.internal.sql.engine.rel.IgniteHashJoin;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/** HashJoin planner test. */
+public class HashJoinPlannerTest extends AbstractPlannerTest {
+    private static final String[] disabledRules = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "MergeJoinConverter"};
+
+    private static final String[] joinTypes = {"LEFT", "RIGHT", "INNER", "FULL OUTER"};
+
+    /**
+     * Hash join need to preserve left collation.
+     */
+    @Test
+    public void hashJoinCheckLeftCollationsPropagation() throws Exception {
+        IgniteTable tbl1 = simpleTable("TEST_TBL", DEFAULT_TBL_SIZE);
+        IgniteTable tbl2 = complexTbl("TEST_TBL_CMPLX");
+
+        IgniteSchema schema = createSchema(tbl1, tbl2);
+
+        String sql = "select t1.ID, t2.ID1 "
+                + "from TEST_TBL_CMPLX t2 "
+                + "join TEST_TBL t1 on t1.id = t2.id1 "
+                + "order by t2.ID1 NULLS LAST, t2.ID2 NULLS LAST";
+
+        // Only hash join
+        RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter",
+                "CorrelatedNestedLoopJoin", "MergeJoinConverter", "JoinCommuteRule");
+
+        IgniteHashJoin join = findFirstNode(phys, byClass(IgniteHashJoin.class));
+        List<RelNode> joinNodes = findNodes(phys, byClass(IgniteHashJoin.class));
+        List<RelNode> sortNodes = findNodes(phys, byClass(IgniteSort.class));
+
+        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+        assertThat(invalidPlanMsg, sortNodes.size(), equalTo(0));
+        assertThat(invalidPlanMsg, joinNodes.size(), equalTo(1));
+        assertThat(invalidPlanMsg, join, notNullValue());
+    }
+
+    /**
+     * Hash join erase right collation.
+     */
+    @Test
+    public void hashJoinCheckRightCollations() throws Exception {
+        IgniteTable tbl1 = simpleTable("TEST_TBL", DEFAULT_TBL_SIZE);
+        IgniteTable tbl2 = complexTbl("TEST_TBL_CMPLX");
+
+        IgniteSchema schema = createSchema(tbl1, tbl2);
+
+        String sql = "select t1.ID, t2.ID1 "
+                + "from TEST_TBL t1 "
+                + "join TEST_TBL_CMPLX t2 on t1.id = t2.id1 "
+                + "order by t2.ID1 NULLS LAST, t2.ID2 NULLS LAST";
+
+        // Only hash join
+        IgniteRel phys = physicalPlan(sql, schema, "NestedLoopJoinConverter",
+                "CorrelatedNestedLoopJoin", "MergeJoinConverter", "JoinCommuteRule");
+
+        IgniteHashJoin join = findFirstNode(phys, byClass(IgniteHashJoin.class));
+
+        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+        assertThat(invalidPlanMsg, join, notNullValue());
+        assertThat(invalidPlanMsg, sortOnTopOfJoin(phys), notNullValue());
+    }
+
+    @Test
+    public void hashJoinWinsOnLeftSkewedInput() throws Exception {
+        IgniteTable thinTblSortedPk = simpleTable("THIN_TBL", 10);
+        IgniteTable thinTblHashPk = simpleTableHashPk("THIN_TBL_HASH_PK", 10);
+        IgniteTable thickTblSortedPk = simpleTable("THICK_TBL", 100_000);
+        IgniteTable thickTblHashPk = simpleTableHashPk("THICK_TBL_HASH_PK", 100_000);
+
+        IgniteSchema schema = createSchema(thinTblSortedPk, thickTblSortedPk, thinTblHashPk, thickTblHashPk);
+
+        String sql = "select t1.ID, t1.ID2, t2.ID, t2.ID2 "
+                + "from THICK_TBL t1 " // left
+                + "join THIN_TBL t2 on t1.ID2 = t2.ID2 "; // right
+
+        assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class)), "JoinCommuteRule");
+
+        sql = "select t1.ID, t1.ID2, t2.ID, t2.ID2 "
+                + "from THIN_TBL t1 " // left
+                + "join THICK_TBL t2 on t1.ID2 = t2.ID2 "; // right
+
+        assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class).negate()), "JoinCommuteRule");
+
+        sql = "select t1.ID, t1.ID2, t2.ID, t2.ID2 "
+                + "from THIN_TBL_HASH_PK t1 " // left
+                + "join THICK_TBL_HASH_PK t2 on t1.ID = t2.ID "; // right
+
+        assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class)), "JoinCommuteRule");
+
+        // merge join can consume less cpu in such a case
+        sql = "select t1.ID, t1.ID2, t2.ID, t2.ID2 "
+                + "from THIN_TBL t1 " // left
+                + "join THICK_TBL t2 on t1.ID = t2.ID "; // right
+
+        assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class).negate()), "JoinCommuteRule");
+    }
+
+    private static @Nullable IgniteSort sortOnTopOfJoin(IgniteRel root) {
+        List<IgniteSort> sortNodes = findNodes(root, byClass(IgniteSort.class)
+                .and(node -> node.getInputs().size() == 1 && node.getInput(0) instanceof Join));
+
+        if (sortNodes.size() > 1) {
+            throw new AssertionError("Unexpected count of sort nodes: exp<=1, act=" + sortNodes.size());
+        }
+
+        return sortNodes.isEmpty() ? null : sortNodes.get(0);
+    }
+
+    /** Check that only appropriate conditions are acceptable for hash join. */
+    @ParameterizedTest()
+    @MethodSource("joinConditions")
+    @SuppressWarnings("ThrowableNotThrown")
+    public void hashJoinAppliedConditions(String sql, boolean canBePlanned) throws Exception {
+        IgniteTable tbl = createTestTable("ID", "C1");
+
+        IgniteSchema schema = createSchema(tbl);
+
+        for (String type : joinTypes) {
+            String sql0 = String.format(sql, type);
+
+            if (canBePlanned) {
+                assertPlan(sql0, schema, nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class)), disabledRules);
+            } else {
+                IgniteTestUtils.assertThrowsWithCause(() -> physicalPlan(sql0, schema, disabledRules),
+                        CannotPlanException.class,
+                        "There are not enough rules");
+            }
+        }
+    }
+
+    private static Stream<Arguments> joinConditions() {
+        return Stream.of(
+                Arguments.of("select t1.c1 from t1 %s join t1 t2 on t1.c1 = t2.c1", true),
+                Arguments.of("select t1.c1 from t1 %s join t1 t2 using(c1)", true),
+                Arguments.of("select t1.c1 from t1 %s join t1 t2 on t1.c1 = 1", false),
+                Arguments.of("select t1.c1 from t1 %s join t1 t2 ON t1.id is not distinct from t2.c1", false),
+                Arguments.of("select t1.c1 from t1 %s join t1 t2 on t1.c1 = ?", false),
+                Arguments.of("select t1.c1 from t1 %s join t1 t2 on t1.c1 = OCTET_LENGTH('TEST')", false),
+                Arguments.of("select t1.c1 from t1 %s join t1 t2 on t1.c1 = LOG10(t1.c1)", false),
+                Arguments.of("select t1.c1 from t1 %s join t1 t2 on t1.c1 = t2.c1 and t1.ID > t2.ID", false),
+                Arguments.of("select t1.c1 from t1 %s join t1 t2 on t1.c1 = 1 and t2.c1 = 1", false)
+        );
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java
index cc77935..a1c2850 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java
@@ -160,7 +160,7 @@
         assertPlan(query, igniteSchema, nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
                         .and(nodeOrAnyChild(new TableScanWithProjection(expected.lhs)))
                         .and(nodeOrAnyChild(new TableScanWithProjection(expected.rhs)))
-        ));
+        ), "HashJoinConverter", "NestedLoopJoinConverter");
     }
 
     /** Nested loop join - casts are added to condition operands. **/
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
index 6d18730..06061a3 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
@@ -18,15 +18,18 @@
 package org.apache.ignite.internal.sql.engine.planner;
 
 import static org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+import java.util.List;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.rel.AbstractIgniteJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
@@ -39,6 +42,8 @@
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Test suite to verify join colocation.
@@ -48,7 +53,7 @@
      * Join of the same tables with a simple affinity is expected to be colocated.
      */
     @Test
-    public void joinSameTableSimpleAff() throws Exception {
+    public void joinSameTableSimpleAffMergeJoin() throws Exception {
         IgniteTable tbl = simpleTable("TEST_TBL", DEFAULT_TBL_SIZE);
 
         IgniteSchema schema = createSchema(tbl);
@@ -57,7 +62,7 @@
                 + "from TEST_TBL t1 "
                 + "join TEST_TBL t2 on t1.id = t2.id";
 
-        RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+        RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "HashJoinConverter");
 
         IgniteMergeJoin join = findFirstNode(phys, byClass(IgniteMergeJoin.class));
 
@@ -70,10 +75,37 @@
     }
 
     /**
-     * Join of the same tables with a complex affinity is expected to be colocated.
+     * Join of the same tables with a simple affinity is expected to be colocated.
      */
     @Test
-    public void joinSameTableComplexAff() throws Exception {
+    public void joinSameTableSimpleAffHashJoin() throws Exception {
+        IgniteTable tbl = simpleTable("TEST_TBL", DEFAULT_TBL_SIZE);
+
+        IgniteSchema schema = createSchema(tbl);
+
+        String sql = "select count(*) "
+                + "from TEST_TBL t1 "
+                + "join TEST_TBL t2 on t1.id = t2.id";
+
+        // Only hash join
+        RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "MergeJoinConverter");
+
+        AbstractIgniteJoin join = findFirstNode(phys, byClass(AbstractIgniteJoin.class));
+        List<RelNode> joinNodes = findNodes(phys, byClass(AbstractIgniteJoin.class));
+
+        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+        assertThat(invalidPlanMsg, joinNodes.size(), equalTo(1));
+        assertThat(invalidPlanMsg, join, notNullValue());
+        assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
+    }
+
+    /**
+     * Join of the same tables with a complex affinity is expected to be colocated.
+     */
+    @ParameterizedTest(name = "DISABLED: {0}")
+    @ValueSource(strings = {"HashJoinConverter", "MergeJoinConverter"})
+    public void joinSameTableComplexAff(String disabledRule) throws Exception {
         IgniteTable tbl = complexTbl("TEST_TBL");
 
         IgniteSchema schema = createSchema(tbl);
@@ -82,16 +114,74 @@
                 + "from TEST_TBL t1 "
                 + "join TEST_TBL t2 on t1.id1 = t2.id1 and t1.id2 = t2.id2";
 
-        RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+        RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", disabledRule);
 
-        IgniteMergeJoin join = findFirstNode(phys, byClass(IgniteMergeJoin.class));
+        AbstractIgniteJoin join = findFirstNode(phys, byClass(AbstractIgniteJoin.class));
 
         String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
 
         assertThat(invalidPlanMsg, join, notNullValue());
         assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
-        assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class));
-        assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class));
+
+        if (!"MergeJoinConverter".equals(disabledRule)) {
+            assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class));
+            assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class));
+        }
+    }
+
+    /**
+     * Join of the same tables with a complex affinity is expected to be colocated.
+     */
+    @ParameterizedTest(name = "DISABLED: {0}")
+    @ValueSource(strings = {"HashJoinConverter", "MergeJoinConverter"})
+    public void joinDiffTablesComplexAff(String disabledRule) throws Exception {
+        IgniteTable tbl1 = complexTbl("TEST_TBL1");
+        IgniteTable tbl2 = complexTbl("TEST_TBL2");
+
+        IgniteSchema schema = createSchema(tbl1, tbl2);
+
+        String sql = "select count(*) "
+                + "from TEST_TBL1 t1 "
+                + "join TEST_TBL2 t2 on t1.id1 = t2.id1 and t1.id2 = t2.id2";
+
+        RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", disabledRule);
+
+        AbstractIgniteJoin join = findFirstNode(phys, byClass(AbstractIgniteJoin.class));
+
+        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+        assertThat(invalidPlanMsg, join, notNullValue());
+        assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
+
+        if (!"MergeJoinConverter".equals(disabledRule)) {
+            assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class));
+            assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class));
+        }
+    }
+
+    /**
+     * Join of the same tables with a complex affinity is expected to be colocated.
+     */
+    @ParameterizedTest(name = "DISABLED: {0}")
+    @ValueSource(strings = {"HashJoinConverter", "MergeJoinConverter"})
+    public void joinDiffTablesNotSupersetComplexAff(String disabledRule) throws Exception {
+        IgniteTable tbl1 = complexTbl("TEST_TBL1");
+        IgniteTable tbl2 = complexTbl("TEST_TBL2");
+
+        IgniteSchema schema = createSchema(tbl1, tbl2);
+
+        String sql = "select count(*) "
+                + "from TEST_TBL1 t1 "
+                + "join TEST_TBL2 t2 on t1.id1 = t2.id1";
+
+        RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", disabledRule);
+
+        AbstractIgniteJoin join = findFirstNode(phys, byClass(AbstractIgniteJoin.class));
+
+        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+        assertThat(invalidPlanMsg, join, notNullValue());
+        assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(false));
     }
 
     /**
@@ -124,7 +214,7 @@
                                 ))
                         ))
                 ))
-        ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+        ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "HashJoinConverter");
     }
 
     /**
@@ -162,10 +252,10 @@
                                 .and(scan -> complexTblIndirect.equals(scan.getTable().unwrap(IgniteTable.class)))
                         ))
                 ))
-        ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+        ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "HashJoinConverter");
     }
 
-    private static IgniteTable simpleTable(String tableName, int size) {
+    static IgniteTable simpleTable(String tableName, int size) {
         return TestBuilders.table()
                 .name(tableName)
                 .size(size)
@@ -180,7 +270,22 @@
                 .build();
     }
 
-    private static IgniteTable complexTbl(String tableName) {
+    static IgniteTable simpleTableHashPk(String tableName, int size) {
+        return TestBuilders.table()
+                .name(tableName)
+                .size(size)
+                .distribution(someAffinity())
+                .addColumn("ID", NativeTypes.INT32)
+                .addColumn("ID2", NativeTypes.INT32)
+                .addColumn("VAL", NativeTypes.STRING)
+                .hashIndex()
+                .name("PK")
+                .addColumn("ID")
+                .end()
+                .build();
+    }
+
+    static IgniteTable complexTbl(String tableName) {
         return complexTbl(tableName, DEFAULT_TBL_SIZE,
                 IgniteDistributions.affinity(ImmutableIntList.of(0, 1), nextTableId(), DEFAULT_ZONE_ID));
     }
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinCommutePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinCommutePlannerTest.java
index 1e21564..d394d46 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinCommutePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinCommutePlannerTest.java
@@ -115,7 +115,7 @@
         // Use aggregates that are the same for both MAP and REDUCE phases.
         String sql = "SELECT SUM(s.id), SUM(h.id) FROM SMALL s RIGHT JOIN HUGE h on h.id = s.id";
 
-        IgniteRel phys = physicalPlan(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin");
+        IgniteRel phys = physicalPlan(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin");
 
         assertNotNull(phys);
 
@@ -127,7 +127,7 @@
 
         assertEquals(JoinRelType.LEFT, join.getJoinType());
 
-        PlanningContext ctx = plannerCtx(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin");
+        PlanningContext ctx = plannerCtx(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin");
 
         RelOptPlanner pl = ctx.cluster().getPlanner();
 
@@ -139,7 +139,7 @@
 
         assertNotNull(phys);
 
-        phys = physicalPlan(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
+        phys = physicalPlan(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
 
         join = findFirstNode(phys, byClass(IgniteNestedLoopJoin.class));
 
@@ -150,7 +150,7 @@
         // no commute
         assertEquals(JoinRelType.RIGHT, join.getJoinType());
 
-        ctx = plannerCtx(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
+        ctx = plannerCtx(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
 
         pl = ctx.cluster().getPlanner();
 
@@ -166,7 +166,7 @@
         // Use aggregates that are the same for both MAP and REDUCE phases.
         String sql = "SELECT SUM(s.id), SUM(h.id) FROM SMALL s JOIN HUGE h on h.id = s.id";
 
-        IgniteRel phys = physicalPlan(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin");
+        IgniteRel phys = physicalPlan(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin");
 
         assertNotNull(phys);
 
@@ -194,7 +194,7 @@
 
         assertEquals(JoinRelType.INNER, join.getJoinType());
 
-        PlanningContext ctx = plannerCtx(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin");
+        PlanningContext ctx = plannerCtx(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin");
 
         RelOptPlanner pl = ctx.cluster().getPlanner();
 
@@ -205,7 +205,7 @@
 
         assertNotNull(phys);
 
-        phys = physicalPlan(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
+        phys = physicalPlan(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
 
         join = findFirstNode(phys, byClass(IgniteNestedLoopJoin.class));
         proj = findFirstNode(phys, byClass(IgniteProject.class));
@@ -231,7 +231,7 @@
         // no commute
         assertEquals(JoinRelType.INNER, join.getJoinType());
 
-        ctx = plannerCtx(sql, publicSchema, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
+        ctx = plannerCtx(sql, publicSchema, "HashJoinConverter", "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule");
 
         pl = ctx.cluster().getPlanner();
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
index 6447185..0a4c52d 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
@@ -258,7 +258,8 @@
      */
     @Test
     public void noSortAppendingWithCorrectCollation() throws Exception {
-        String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule"};
+        String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule",
+                "HashJoinConverter"};
 
         assertPlan(TestCase.CASE_16,
                 nodeOrAnyChild(isInstanceOf(IgniteSort.class)
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
index 14d8da0..0ca318b 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
@@ -260,7 +260,8 @@
      */
     @Test
     public void noSortAppendingWithCorrectCollation() throws Exception {
-        String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule"};
+        String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule",
+                "HashJoinConverter"};
 
         assertPlan(TestCase.CASE_16,
                 nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MergeJoinPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MergeJoinPlannerTest.java
index a0f95d8..0029c7a 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MergeJoinPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MergeJoinPlannerTest.java
@@ -47,7 +47,8 @@
             "NestedLoopJoinConverter",
             "CorrelatedNestedLoopJoin",
             "FilterSpoolMergeRule",
-            "JoinCommuteRule"
+            "JoinCommuteRule",
+            "HashJoinConverter"
     };
 
     /**
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
index 6dfd738..6fd93e0 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
@@ -63,7 +63,7 @@
         IgniteRel phys = physicalPlan(
                 sql,
                 publicSchema,
-                "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule"
+                "HashJoinConverter", "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule"
         );
 
         IgniteSortedIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteSortedIndexSpool.class));
@@ -95,7 +95,7 @@
         IgniteRel phys = physicalPlan(
                 sql,
                 publicSchema,
-                "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule"
+                "HashJoinConverter", "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule"
         );
 
         System.out.println("+++ \n" + RelOptUtil.toString(phys));
@@ -150,7 +150,7 @@
                                 })
                                 .and(hasChildThat(isIndexScan("T1", "idx_jid")))
                         )),
-                "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule"
+                "HashJoinConverter", "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule"
         );
     }
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java
index 6cfd44d..547f8ec 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TableSpoolPlannerTest.java
@@ -52,7 +52,7 @@
                 + "join t1 on t0.jid > t1.jid";
 
         IgniteRel phys = physicalPlan(sql, publicSchema,
-                "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule");
+                "HashJoinConverter", "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule");
 
         assertNotNull(phys);
 
diff --git a/modules/sql-engine/src/test/resources/mapping/correlated.test b/modules/sql-engine/src/test/resources/mapping/correlated.test
index 779cd1a..9dd930b 100644
--- a/modules/sql-engine/src/test/resources/mapping/correlated.test
+++ b/modules/sql-engine/src/test/resources/mapping/correlated.test
@@ -129,11 +129,10 @@
   exchangeSourceNodes: {1=[N1]}
   tree:
     Project
-      MergeJoin
+      HashJoin
         Receiver(sourceFragment=1, exchange=1, distribution=single)
-        Sort
-          Filter
-            TableFunctionScan(source=2, distribution=single)
+        Filter
+          TableFunctionScan(source=2, distribution=single)
 
 Fragment#1
   targetNodes: [N0]
@@ -142,8 +141,7 @@
   partitions: {N1=[0:1]}
   tree:
     Sender(targetFragment=0, exchange=1, distribution=single)
-      Sort
-        TableScan(name=PUBLIC.T_N1, source=3, partitions=1, distribution=random)
+      TableScan(name=PUBLIC.T_N1, source=3, partitions=1, distribution=random)
 ---
 
 N0
diff --git a/modules/sql-engine/src/test/resources/mapping/hash_join.test b/modules/sql-engine/src/test/resources/mapping/hash_join.test
new file mode 100644
index 0000000..7c4ae58
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/hash_join.test
@@ -0,0 +1,102 @@
+N0
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('JoinCommuteRule', 'MergeJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN t2_n1 USING (id)
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1]
+  exchangeSourceNodes: {1=[N1]}
+  tree:
+    Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N1]
+  tables: [T1_N1, T2_N1]
+  partitions: {N1=[0:1]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      Project
+        HashJoin
+          TableScan(name=PUBLIC.T1_N1, source=2, partitions=1, distribution=affinity[table: T1_N1, columns: [ID]])
+          TableScan(name=PUBLIC.T2_N1, source=3, partitions=1, distribution=affinity[table: T2_N1, columns: [ID]])
+---
+
+N0
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('JoinCommuteRule', 'MergeJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 t1 JOIN t1_n1 t2 ON t1.id = t2.id
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1]
+  exchangeSourceNodes: {1=[N1]}
+  tree:
+    Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N1]
+  tables: [T1_N1, T1_N1]
+  partitions: {N1=[0:1]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      HashJoin
+        TableScan(name=PUBLIC.T1_N1, source=2, partitions=1, distribution=affinity[table: T1_N1, columns: [ID]])
+        TableScan(name=PUBLIC.T1_N1, source=3, partitions=1, distribution=affinity[table: T1_N1, columns: [ID]])
+---
+
+N0
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('JoinCommuteRule', 'MergeJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN t2_n2 USING (id)
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1]
+  exchangeSourceNodes: {1=[N1]}
+  tree:
+    Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#4
+  targetNodes: [N0]
+  executionNodes: [N1]
+  remoteFragments: [5]
+  exchangeSourceNodes: {5=[N2]}
+  tables: [T1_N1, T2_N2]
+  partitions: {N1=[0:1]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      Project
+        HashJoin
+          TableScan(name=PUBLIC.T1_N1, source=2, partitions=1, distribution=affinity[table: T1_N1, columns: [ID]])
+          Receiver(sourceFragment=5, exchange=5, distribution=affinity[table: T2_N2, columns: [ID]])
+
+Fragment#5
+  targetNodes: [N1]
+  executionNodes: [N2]
+  tables: [T2_N2]
+  partitions: {N2=[0:1]}
+  tree:
+    Sender(targetFragment=4, exchange=5, distribution=affinity[table: T2_N2, columns: [ID]])
+      TableScan(name=PUBLIC.T2_N2, source=3, partitions=1, distribution=affinity[table: T2_N2, columns: [ID]])
+---
+
+N0
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('JoinCommuteRule', 'MergeJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1
+JOIN TABLE(SYSTEM_RANGE(0, 10)) r ON t1_n1.id = r.x
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1]
+  exchangeSourceNodes: {1=[N1]}
+  tree:
+    Project
+      HashJoin
+        Receiver(sourceFragment=1, exchange=1, distribution=single)
+        TableFunctionScan(source=2, distribution=single)
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N1]
+  tables: [T1_N1]
+  partitions: {N1=[0:1]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T1_N1, source=3, partitions=1, distribution=affinity[table: T1_N1, columns: [ID]])
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/merge_join.test b/modules/sql-engine/src/test/resources/mapping/merge_join.test
index 5df9ab5..c1d88b6 100644
--- a/modules/sql-engine/src/test/resources/mapping/merge_join.test
+++ b/modules/sql-engine/src/test/resources/mapping/merge_join.test
@@ -1,5 +1,5 @@
 N0
-SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('NestedLoopJoinConverter') */ * FROM t1_n1 JOIN t2_n1 USING (id)
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('HashJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN t2_n1 USING (id)
 ---
 Fragment#0 root
   executionNodes: [N0]
@@ -24,7 +24,7 @@
 ---
 
 N1
-SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('NestedLoopJoinConverter') */ * FROM t1_n1 JOIN t2_n1 USING (id)
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('HashJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN t2_n1 USING (id)
 ---
 Fragment#0 root
   executionNodes: [N1]
@@ -49,7 +49,7 @@
 ---
 
 N0
-SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('NestedLoopJoinConverter') */ * FROM t1_n1 JOIN t2_n2 USING (id)
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('HashJoinConverter', 'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN t2_n2 USING (id)
 ---
 Fragment#0 root
   executionNodes: [N0]
diff --git a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
index 8fc500b..26ee11a 100644
--- a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
+++ b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
@@ -20,7 +20,7 @@
 ---
 # Partition pruning of joined tables (relies on predicate push down)
 N1
-SELECT * FROM t1_n1n2n3 as t1, t2_n4n5 as t2 WHERE t1.c1 = t2.c1 and t1.id = 1 and t2.id = 42
+SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter', 'HashJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1n2n3 as t1, t2_n4n5 as t2 WHERE t1.c1 = t2.c1 and t1.id = 1 and t2.id = 42
 ---
 Fragment#0 root
   executionNodes: [N1]
@@ -53,7 +53,7 @@
 ---
 # Self join, different predicates that produce same set of partitions
 N1
-SELECT * FROM t1_n1n2n3 as t1, t1_n1n2n3 as t2 WHERE t1.c1 = t2.c1 and t1.id = 1 and t2.id = 17
+SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter', 'HashJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1n2n3 as t1, t1_n1n2n3 as t2 WHERE t1.c1 = t2.c1 and t1.id = 1 and t2.id = 17
 ---
 Fragment#0 root
   executionNodes: [N1]
@@ -86,7 +86,7 @@
 ---
 # Self join, different predicates that produce disjoint set of partitions
 N1
-SELECT * FROM t1_n1n2n3 as t1, t1_n1n2n3 as t2 WHERE t1.id = t2.id and t1.id = 1 and t2.id = 42
+SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter', 'HashJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1n2n3 as t1, t1_n1n2n3 as t2 WHERE t1.id = t2.id and t1.id = 1 and t2.id = 42
 ---
 Fragment#0 root
   executionNodes: [N1]
diff --git a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/BaseSqlIntegrationTest.java b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/BaseSqlIntegrationTest.java
index 4ab357b..f70a565 100644
--- a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/BaseSqlIntegrationTest.java
+++ b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/BaseSqlIntegrationTest.java
@@ -111,19 +111,29 @@
         NESTED_LOOP(
                 "CorrelatedNestedLoopJoin",
                 "JoinCommuteRule",
-                "MergeJoinConverter"
+                "MergeJoinConverter",
+                "HashJoinConverter"
         ),
 
         MERGE(
                 "CorrelatedNestedLoopJoin",
                 "JoinCommuteRule",
-                "NestedLoopJoinConverter"
+                "NestedLoopJoinConverter",
+                "HashJoinConverter"
         ),
 
         CORRELATED(
                 "MergeJoinConverter",
                 "JoinCommuteRule",
-                "NestedLoopJoinConverter"
+                "NestedLoopJoinConverter",
+                "HashJoinConverter"
+        ),
+
+        HASHJOIN(
+                "MergeJoinConverter",
+                "JoinCommuteRule",
+                "NestedLoopJoinConverter",
+                "CorrelatedNestedLoopJoin"
         );
 
         private final String[] disabledRules;