IGNITE-13019 Fix incorrect JOIN when querying a single-node cluster (#8744)

diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 276694e..f0e3f14 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -125,6 +125,12 @@
     /** Whether partition extraction is possible. */
     private final boolean canExtractPartitions;
 
+    /** Distributed joins flag. */
+    private final boolean distributedJoins;
+
+    /** Ignite logger. */
+    private final IgniteLogger log;
+
     /** */
     private final IdentityHashMap<GridSqlAst, GridSqlAlias> uniqueFromAliases = new IdentityHashMap<>();
 
@@ -144,11 +150,14 @@
         boolean collocatedGrpBy,
         boolean distributedJoins,
         boolean locSplit,
-        PartitionExtractor extractor
+        PartitionExtractor extractor,
+        IgniteLogger log
     ) {
         this.paramsCnt = paramsCnt;
         this.collocatedGrpBy = collocatedGrpBy;
         this.extractor = extractor;
+        this.distributedJoins = distributedJoins;
+        this.log = log;
 
         // Partitions *CANNOT* be extracted if:
         // 1) Distributed joins are enabled (https://issues.apache.org/jira/browse/IGNITE-10971)
@@ -263,7 +272,8 @@
             collocatedGrpBy,
             distributedJoins,
             locSplit,
-            idx.partitionExtractor()
+            idx.partitionExtractor(),
+            log
         );
 
         // Normalization will generate unique aliases for all the table filters in FROM.
@@ -1263,11 +1273,14 @@
 
         setupParameters(map, mapQry, paramsCnt);
 
+        SqlAstTraverser traverser = new SqlAstTraverser(mapQry, distributedJoins, log);
+        traverser.traverse();
+
         map.columns(collectColumns(mapExps));
         map.sortColumns(mapQry.sort());
-        map.partitioned(SplitterUtils.hasPartitionedTables(mapQry));
-        map.hasSubQueries(SplitterUtils.hasSubQueries(mapQry));
-        map.hasOuterJoinReplicatedPartitioned(SplitterUtils.hasOuterJoinReplicatedPartitioned(mapQry.from()));
+        map.partitioned(traverser.hasPartitionedTables());
+        map.hasSubQueries(traverser.hasSubQueries());
+        map.hasOuterJoinReplicatedPartitioned(traverser.hasOuterJoinReplicatedPartitioned());
 
         if (map.isPartitioned() && canExtractPartitions)
             map.derivedPartitions(extractor.extract(mapQry));
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SplitterUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SplitterUtils.java
index fe47c68..3fcfcbd 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SplitterUtils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SplitterUtils.java
@@ -165,35 +165,6 @@
     }
 
     /**
-     * Checks whether the expression has an OUTER JOIN from replicated to partitioned.
-     *
-     * This is used to infer the `treatReplicatedAsPartitioned` flag
-     * to eventually pass it to {@link org.apache.ignite.spi.indexing.IndexingQueryFilterImpl}.
-     *
-     * @param from FROM expression.
-     * @return {@code true} if the expression has an OUTER JOIN from replicated to partitioned.
-     */
-    public static boolean hasOuterJoinReplicatedPartitioned(GridSqlAst from) {
-        boolean isRightPartitioned = false;
-        while (from instanceof GridSqlJoin) {
-            GridSqlJoin join = (GridSqlJoin)from;
-
-            assert !(join.rightTable() instanceof GridSqlJoin);
-
-            isRightPartitioned = isRightPartitioned || hasPartitionedTables(join.rightTable());
-
-            if (join.isLeftOuter()) {
-                boolean isLeftPartitioned = hasPartitionedTables(join.leftTable());
-                return !isLeftPartitioned && isRightPartitioned;
-            }
-
-            from = join.leftTable();
-        }
-
-        return false;
-    }
-
-    /**
      * @param ast Reduce query AST.
      * @param rdcQry Reduce query string.
      */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SqlAstTraverser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SqlAstTraverser.java
new file mode 100644
index 0000000..dbf6227
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SqlAstTraverser.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.sql;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+
+/**
+ * Traverse over query AST to find info about partitioned table usage.
+ */
+class SqlAstTraverser {
+    /** Query AST root to check. */
+    private final GridSqlAst root;
+
+    /** Whether user specified distributed joins flag. */
+    private final boolean distributedJoins;
+
+    /** Ignite logger. */
+    private final IgniteLogger log;
+
+    /** Whether query has partitioned tables. */
+    private boolean hasPartitionedTables;
+
+    /** Whether query has sub queries. */
+    private boolean hasSubQueries;
+
+    /** Whether query has joins between replicated and partitioned tables. */
+    private boolean hasOuterJoinReplicatedPartitioned;
+
+    /** */
+    SqlAstTraverser(GridSqlAst root, boolean distributedJoins, IgniteLogger log) {
+        this.root = root;
+        this.distributedJoins = distributedJoins;
+        this.log = log;
+    }
+
+    /** */
+    public void traverse() {
+        lookForPartitionedJoin(root, null);
+    }
+
+    /** */
+    public boolean hasPartitionedTables() {
+        return hasPartitionedTables;
+    }
+
+    /** */
+    public boolean hasSubQueries() {
+        return hasSubQueries;
+    }
+
+    /** */
+    public boolean hasOuterJoinReplicatedPartitioned() {
+        return hasOuterJoinReplicatedPartitioned;
+    }
+
+    /**
+     * Traverse AST while join operation isn't found. Check it if found.
+     *
+     * @param ast AST item to check recursively.
+     * @param upWhere Where condition that applies to this ast.
+     */
+    private void lookForPartitionedJoin(GridSqlAst ast, GridSqlAst upWhere) {
+        if (ast == null)
+            return;
+
+        GridSqlJoin join = null;
+        GridSqlAst where = null;
+
+        if (ast instanceof GridSqlJoin) {
+            join = (GridSqlJoin) ast;
+            where = upWhere;
+        }
+        else if (ast instanceof GridSqlSelect) {
+            GridSqlSelect select = (GridSqlSelect) ast;
+
+            if (select.from() instanceof GridSqlJoin) {
+                join = (GridSqlJoin) select.from();
+                where = select.where();
+            }
+        }
+        else if (ast instanceof GridSqlSubquery)
+            hasSubQueries = true;
+        else if (ast instanceof GridSqlTable)
+            hasPartitionedTables |= ((GridSqlTable) ast).dataTable().isPartitioned();
+
+        // No joins on this level. Traverse AST deeper.
+        if (join == null) {
+            for (int i = 0; i < ast.size(); i++)
+                lookForPartitionedJoin(ast.child(i), null);
+
+            return;
+        }
+
+        // Check WHERE clause first.
+        lookForPartitionedJoin(where, null);
+
+        // Check left side of join.
+        GridSqlTable leftTable = getTable(join.leftTable());
+
+        GridH2Table left = null;
+
+        // Left side of join is a subquery.
+        if (leftTable == null) {
+            hasSubQueries = true;
+
+            // Check subquery on left side.
+            lookForPartitionedJoin(join.leftTable(), where);
+        }
+        else {
+            left = leftTable.dataTable();
+
+            // Data table is NULL for views.
+            if (left != null && left.isPartitioned())
+                hasPartitionedTables = true;
+        }
+
+        // Check right side of join.
+        GridSqlTable rightTable = getTable(join.rightTable());
+
+        // Right side of join is a subquery.
+        if (rightTable == null) {
+            hasSubQueries = true;
+
+            // Check subquery and return (can't exctract more info there).
+            lookForPartitionedJoin(join.rightTable(), where);
+            return;
+        }
+
+        GridH2Table right = rightTable.dataTable();
+
+        if (right != null && right.isPartitioned())
+            hasPartitionedTables = true;
+
+        // Skip check of views.
+        if (left == null || right == null)
+            return;
+
+        if (join.isLeftOuter() && !left.isPartitioned() && right.isPartitioned())
+            hasOuterJoinReplicatedPartitioned = true;
+
+        // Skip check if at least one of tables isn't partitioned.
+        if (!(left.isPartitioned() && right.isPartitioned()))
+            return;
+
+        if (!distributedJoins)
+            checkPartitionedJoin(join, where, left, right, log);
+    }
+
+    /**
+     * Checks whether an AST contains valid join operation between partitioned tables.
+     * Join condition should be an equality operation of affinity keys of tables. Conditions can be splitted between
+     * join and where clauses. If join is invalid then warning a user about that.
+     *
+     * @param join The join to check.
+     * @param where The where statement from previous AST, for nested joins.
+     * @param left Left side of join.
+     * @param right Right side of join.
+     * @param log Ignite logger.
+     */
+    private void checkPartitionedJoin(GridSqlJoin join, GridSqlAst where, GridH2Table left, GridH2Table right, IgniteLogger log) {
+        String leftTblAls = getAlias(join.leftTable());
+        String rightTblAls = getAlias(join.rightTable());
+
+        // User explicitly specify an affinity key. Otherwise use primary key.
+        boolean pkLeft = left.getExplicitAffinityKeyColumn() == null;
+        boolean pkRight = right.getExplicitAffinityKeyColumn() == null;
+
+        Set<String> leftAffKeys = affKeys(pkLeft, left);
+        Set<String> rightAffKeys = affKeys(pkRight, right);
+
+        boolean joinIsValid = checkPartitionedCondition(join.on(),
+            leftTblAls, leftAffKeys, pkLeft,
+            rightTblAls, rightAffKeys, pkRight);
+
+        if (!joinIsValid && where instanceof GridSqlElement)
+            joinIsValid = checkPartitionedCondition((GridSqlElement) where,
+                leftTblAls, leftAffKeys, pkLeft,
+                rightTblAls, rightAffKeys, pkRight);
+
+        if (!joinIsValid) {
+            log.warning(
+                String.format(
+                    "For join two partitioned tables join condition should be the equality operation of affinity keys." +
+                        " Left side: %s; right side: %s", left.getName(), right.getName())
+            );
+        }
+    }
+
+    /** Extract table instance from an AST element. */
+    private GridSqlTable getTable(GridSqlElement el) {
+        if (el instanceof GridSqlTable)
+            return (GridSqlTable) el;
+
+        if (el instanceof GridSqlAlias && el.child() instanceof GridSqlTable)
+            return el.child();
+
+        return null;
+    }
+
+    /** Extract alias value. */
+    private String getAlias(GridSqlElement el) {
+        if (el instanceof GridSqlAlias)
+            return ((GridSqlAlias)el).alias();
+
+        return null;
+    }
+
+    /** @return Set of possible affinity keys for this table, incl. default _KEY. */
+    private Set<String> affKeys(boolean pk, GridH2Table tbl) {
+        Set<String> affKeys = new HashSet<>();
+
+        // User explicitly specify an affinity key. Otherwise use primary key.
+        if (!pk)
+            affKeys.add(tbl.getAffinityKeyColumn().columnName);
+        else {
+            affKeys.add("_KEY");
+
+            String keyFieldName = tbl.rowDescriptor().type().keyFieldName();
+
+            if (keyFieldName == null)
+                affKeys.addAll(tbl.rowDescriptor().type().primaryKeyFields());
+            else
+                affKeys.add(keyFieldName);
+        }
+
+        return affKeys;
+    }
+
+    /**
+     * Valid join condition contains:
+     * 1. Equality of Primary (incl. cases of complex PK) or Affinity keys;
+     * 2. Additional conditions must be joint with AND operation to the affinity join condition.
+     *
+     * @return {@code true} if join condition contains affinity join condition, otherwise {@code false}.
+     */
+    private boolean checkPartitionedCondition(GridSqlElement condition,
+        String leftTbl, Set<String> leftAffKeys, boolean pkLeft,
+        String rightTbl, Set<String> rightAffKeys, boolean pkRight) {
+
+        if (!(condition instanceof GridSqlOperation))
+            return false;
+
+        GridSqlOperation op = (GridSqlOperation) condition;
+
+        // It is may be a part of affinity condition.
+        if (GridSqlOperationType.EQUAL == op.operationType())
+            checkEqualityOperation(op, leftTbl, leftAffKeys, pkLeft, rightTbl, rightAffKeys, pkRight);
+
+        // Check affinity condition is covered fully. If true then return. Otherwise go deeper.
+        if (affinityCondIsCovered(leftAffKeys, rightAffKeys))
+            return true;
+
+        // If we don't cover affinity condition prior to first AND then this is not an affinity condition.
+        if (GridSqlOperationType.AND != op.operationType())
+            return false;
+
+        // Go recursively to childs.
+        for (int i = 0; i < op.size(); i++) {
+            boolean ret = checkPartitionedCondition(op.child(i),
+                leftTbl, leftAffKeys, pkLeft,
+                rightTbl, rightAffKeys, pkRight);
+
+            if (ret)
+                return true;
+        }
+
+        // Join condition doesn't contain affinity condition.
+        return false;
+    }
+
+    /** */
+    private void checkEqualityOperation(GridSqlOperation equalOp,
+        String leftTbl, Set<String> leftCols, boolean pkLeft,
+        String rightTbl, Set<String> rightCols, boolean pkRight) {
+
+        if (!(equalOp.child(0) instanceof GridSqlColumn))
+            return;
+
+        if (!(equalOp.child(1) instanceof GridSqlColumn))
+            return;
+
+        String leftCol = ((GridSqlColumn) equalOp.child(0)).columnName();
+        String rightCol = ((GridSqlColumn) equalOp.child(1)).columnName();
+
+        String leftTblAls = ((GridSqlColumn) equalOp.child(0)).tableAlias();
+        String rightTblAls = ((GridSqlColumn) equalOp.child(1)).tableAlias();
+
+        Set<String> actLeftCols;
+        Set<String> actRightCols;
+
+        if (leftTbl.equals(leftTblAls))
+            actLeftCols = leftCols;
+        else if (leftTbl.equals(rightTblAls))
+            actLeftCols = rightCols;
+        else
+            return;
+
+        if (rightTbl.equals(rightTblAls))
+            actRightCols = rightCols;
+        else if (rightTbl.equals(leftTblAls))
+            actRightCols = leftCols;
+        else
+            return;
+
+        // This is part of the affinity join condition.
+        if (actLeftCols.contains(leftCol) && actRightCols.contains(rightCol)) {
+            if (pkLeft && "_KEY".equals(leftCol))
+                actLeftCols.clear();
+            else if (pkLeft) {
+                actLeftCols.remove(leftCol);
+                // Only _KEY is there.
+                if (actLeftCols.size() == 1)
+                    actLeftCols.clear();
+            }
+            else
+                actLeftCols.remove(leftCol);
+
+            if (pkRight && "_KEY".equals(rightCol))
+                actRightCols.clear();
+            else if (pkRight) {
+                actRightCols.remove(rightCol);
+                // Only _KEY is there.
+                if (actRightCols.size() == 1)
+                    actRightCols.clear();
+            }
+            else
+                actRightCols.remove(rightCol);
+        }
+    }
+
+    /** */
+    private boolean affinityCondIsCovered(Set<String> leftAffKeys, Set<String> rightAffKeys) {
+        return leftAffKeys.isEmpty() && rightAffKeys.isEmpty();
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/sqltests/CheckWarnJoinPartitionedTables.java b/modules/indexing/src/test/java/org/apache/ignite/sqltests/CheckWarnJoinPartitionedTables.java
new file mode 100644
index 0000000..0163fdf
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/sqltests/CheckWarnJoinPartitionedTables.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.sqltests;
+
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/** Check that illegal joins of partitioned tables are warned. */
+public class CheckWarnJoinPartitionedTables extends GridCommonAbstractTest {
+    /** */
+    private final ListeningTestLogger testLog = new ListeningTestLogger(log);
+
+    /** */
+    private Ignite crd;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setGridLogger(testLog);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        crd = startGrid();
+    }
+
+    /** */
+    @Test
+    public void joinSameTableWithPrimaryKey() {
+        execute(crd, new SqlFieldsQuery(
+            "CREATE TABLE A (ID INT PRIMARY KEY, TITLE VARCHAR);"));
+
+        checkSameTableWithJoinType("LEFT JOIN");
+        checkSameTableWithJoinType("RIGHT JOIN");
+        checkSameTableWithJoinType("INNER JOIN");
+        checkSameTableWithJoinType("JOIN");
+    }
+
+    /** */
+    @Test
+    public void joinSameTableWithPrimaryAffinityKey() {
+        execute(crd, new SqlFieldsQuery(
+            "CREATE TABLE A (ID INT PRIMARY KEY, TITLE VARCHAR) with \"AFFINITY_KEY=ID\";"));
+
+        checkSameTableWithJoinType("LEFT JOIN");
+        checkSameTableWithJoinType("RIGHT JOIN");
+        checkSameTableWithJoinType("INNER JOIN");
+        checkSameTableWithJoinType("JOIN");
+    }
+
+    /** */
+    private void checkSameTableWithJoinType(String joinType) {
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a2.ID = a1.ID;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2._KEY;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1._KEY = a2.ID;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1._KEY = a2._KEY;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a1.ID = a2.ID;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a1.ID = a2._KEY;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID AND a1.TITLE = a2.TITLE;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID AND a1.TITLE != a2.TITLE;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID AND (a1.TITLE = a2.TITLE OR a1.ID = a2.ID);");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on (a1.TITLE = a2.TITLE OR a1.ID = a2.ID) AND a1.ID = a2.ID;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " (select a2.ID from A a2) a3 on a1.ID = a3.ID;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " (select ID from A) a2 where a1.ID = a2.ID;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 where a1.ID in (select a2.ID from A a2 " + joinType + " A a3 on a2.ID = a3.ID);");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " (select a2.ID from A a2 " + joinType + " A a3 on a2.ID = a3.ID) t" +
+                " on a1.ID = t.ID;");
+
+        // Ignite doesn't support this.
+        if (!joinType.contains("RIGHT"))
+            checkLogListener(false,
+                "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID " + joinType + " A a3 on a1.ID = a3.ID;");
+
+        // Some simple illegal joins.
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on 1 = 1;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID > a2.ID;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID != a2.ID;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.TITLE;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1._KEY = a2.TITLE;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.TITLE = a2.TITLE;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID OR a1.TITLE = a2.TITLE;");
+
+        // Actually this is correct query. But this AST is too complex to analyze, and also it could be simplified.
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID OR (a1.ID = a2.ID AND a1.TITLE = a2.TITLE);");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " (select ID from A) a2 on a1.ID > a2.ID;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " (select ID from A) a2 where a1.ID > a2.ID;");
+
+        if (!joinType.contains("RIGHT"))
+            checkLogListener(true,
+                "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.TITLE " + joinType + " A a3 on a1.ID = a3.ID;");
+
+        // This query is invalid. But we don't have info about partitioning in nested joins.
+        if (!joinType.contains("RIGHT")) {
+            // For inner join we can check it because join conditions are parsed to WHERE clause.
+            boolean shouldWarn = !joinType.contains("LEFT");
+
+            checkLogListener(shouldWarn,
+                "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID " + joinType + " A a3 on a1.ID = a3.TITLE;");
+        }
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 where a1.ID in (select a2.ID from A a2 " + joinType + " A a3 on a2.ID = a3.TITLE);");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 where a1.ID in (select a2.ID from A a2 " + joinType + " A a3 on a2.ID != a3.ID);");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " (select a2.ID from A a2 " + joinType + " A a3 on a2.ID = a3.TITLE) t" +
+                " on a1.ID = t.ID;");
+    }
+
+    /** */
+    @Test
+    public void joinSameTableWithComplexPrimaryKey() {
+        execute(crd, new SqlFieldsQuery(
+            "CREATE TABLE A (ID INT, TITLE VARCHAR, PRICE INT, COMMENT VARCHAR, PRIMARY KEY (ID, TITLE, PRICE));"));
+
+        checkSameTableWithComplexPrimaryKeyWithJoinType("LEFT JOIN");
+        checkSameTableWithComplexPrimaryKeyWithJoinType("RIGHT JOIN");
+        checkSameTableWithComplexPrimaryKeyWithJoinType("INNER JOIN");
+        checkSameTableWithComplexPrimaryKeyWithJoinType("JOIN");
+    }
+
+    /** */
+    private void checkSameTableWithComplexPrimaryKeyWithJoinType(String joinType) {
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1._KEY = a2._KEY;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a1._KEY = a2._KEY;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a2._KEY = a1._KEY;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID and a1.TITLE = a2.TITLE and a1.PRICE = a2.PRICE;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a1.ID = a2.ID and a1.TITLE = a2.TITLE and a1.PRICE = a2.PRICE;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on " +
+                "a1.ID = a2.ID and a1.TITLE = a2.TITLE and a1.PRICE = a2.PRICE and a1.COMMENT = a2.COMMENT;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on " +
+                "a1.ID = a2.ID and a1.TITLE = a2.TITLE and a1.PRICE = a2.PRICE and a1.COMMENT != a2.COMMENT;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.PRICE = a2.PRICE and a1.TITLE = a2.TITLE and a1.ID = a2.ID;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a1.PRICE = a2.PRICE and a1.TITLE = a2.TITLE and a1.ID = a2.ID;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID where a1.PRICE = a2.PRICE and a1.TITLE = a2.TITLE;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID where a1.PRICE = a2.PRICE and a1.TITLE = a2.TITLE;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 where a1.ID in (" +
+                "   select a2.ID from A a2 " + joinType + " A a3 on a2.ID = a3.ID and a2.TITLE = a3.TITLE and a2.PRICE = a3.PRICE);");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 where a1.ID in (" +
+                "   select a2.ID from A a2 " + joinType + " A a3 where a2.ID = a3.ID and a2.TITLE = a3.TITLE and a2.PRICE = a3.PRICE);");
+
+        if (!joinType.contains("RIGHT"))
+            checkLogListener(false,
+                "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1._KEY = a2._KEY " + joinType + " A a3 on a1._KEY = a3._KEY;");
+
+        // Some simple illegal joins.
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a2._KEY != a1._KEY;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a2._KEY != a1._KEY;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID != a2.ID and a1.TITLE = a2.TITLE and a1.PRICE = a2.PRICE;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a1.ID != a2.ID and a1.TITLE = a2.TITLE and a1.PRICE = a2.PRICE;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID != a2.ID and a1.TITLE != a2.TITLE and a1.PRICE != a2.PRICE;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a1.ID != a2.ID and a1.TITLE != a2.TITLE and a1.PRICE != a2.PRICE;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID and a1.PRICE = a2.PRICE;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a1.ID = a2.ID and a1.PRICE = a2.PRICE;");
+
+        if (!joinType.contains("RIGHT")) {
+            // For inner join we can check it because join conditions are parsed to WHERE clause.
+            boolean shouldWarn = !joinType.contains("LEFT");
+
+            checkLogListener(false,
+                "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1._KEY = a2._KEY " + joinType + " A a3 on a1._KEY != a3._KEY;");
+
+            checkLogListener(!shouldWarn,
+                "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1._KEY != a2._KEY " + joinType + " A a3 on a1._KEY = a3._KEY;");
+        }
+    }
+
+    /** */
+    @Test
+    public void joinSameTableWithComplexPrimaryKeySingleAffKey() {
+        execute(crd, new SqlFieldsQuery(
+            "CREATE TABLE A (ID INT, TITLE VARCHAR, PRICE INT, COMMENT VARCHAR, PRIMARY KEY (ID, TITLE, PRICE))" +
+                " with \"AFFINITY_KEY=ID\";"));
+
+        checkSameTableWithComplexPrimaryKeySingleAffKeyWithJoinType("LEFT JOIN");
+        checkSameTableWithComplexPrimaryKeySingleAffKeyWithJoinType("RIGHT JOIN");
+        checkSameTableWithComplexPrimaryKeySingleAffKeyWithJoinType("INNER JOIN");
+        checkSameTableWithComplexPrimaryKeySingleAffKeyWithJoinType("JOIN");
+    }
+
+    /** */
+    private void checkSameTableWithComplexPrimaryKeySingleAffKeyWithJoinType(String joinType) {
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a1.ID = a2.ID;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID and a1.TITLE != a2.TITLE;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a1.ID = a2.ID and a1.TITLE != a2.TITLE;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.TITLE != a2.TITLE and a1.ID = a2.ID;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a1.TITLE != a2.TITLE and a1.ID = a2.ID;");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 where a1.ID in (select a2.ID from A a2 " + joinType + " A a3 on a2.ID = a3.ID);");
+
+        checkLogListener(false,
+            "SELECT a1.* FROM A a1 " + joinType + " (select a2.ID from A a2 " + joinType + " A a3 on a2.ID = a3.ID) t" +
+                " on a1.ID = t.ID;");
+
+        if (!joinType.contains("RIGHT"))
+            checkLogListener(false,
+                "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID " + joinType + " A a3 on a1.ID = a3.ID;");
+
+        // Some simple illegal joins.
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1._KEY = a2._KEY;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a1._KEY = a2._KEY;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID != a2.ID;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a1.ID != a2.ID;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID != a2.ID and a1.TITLE = a2.TITLE and a1.PRICE = a2.PRICE;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a1.ID != a2.ID and a1.TITLE = a2.TITLE and a1.PRICE = a2.PRICE;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.TITLE = a2.TITLE;");
+
+        checkLogListener(true,
+            "SELECT a1.* FROM A a1 " + joinType + " A a2 where a1.TITLE = a2.TITLE;");
+
+        if (!joinType.contains("RIGHT")) {
+            // For inner join we can check it because join conditions are parsed to WHERE clause.
+            boolean shouldWarn = !joinType.contains("LEFT");
+
+            checkLogListener(false,
+                "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID = a2.ID " + joinType + " A a3 on a1.ID != a3.ID;");
+
+            checkLogListener(!shouldWarn,
+                "SELECT a1.* FROM A a1 " + joinType + " A a2 on a1.ID != a2.ID " + joinType + " A a3 on a1.ID = a3.ID;");
+        }
+    }
+
+    /** */
+    @Test
+    public void joinWithPrimaryKey() {
+        execute(crd, new SqlFieldsQuery(
+            "CREATE TABLE A (ID INT PRIMARY KEY, TITLE VARCHAR);"));
+
+        execute(crd, new SqlFieldsQuery(
+            "CREATE TABLE B (ID INT PRIMARY KEY, PRICE INT);"));
+
+        checkJoinPrimaryKeyWithJoinType("LEFT JOIN");
+        checkJoinPrimaryKeyWithJoinType("RIGHT JOIN");
+        checkJoinPrimaryKeyWithJoinType("INNER JOIN");
+        checkJoinPrimaryKeyWithJoinType("JOIN");
+    }
+
+    /** */
+    private void checkJoinPrimaryKeyWithJoinType(String joinType) {
+        checkLogListener(false,
+            "SELECT a.* FROM A a " + joinType + " B b on a.ID = b.ID;");
+
+        checkLogListener(false,
+            "SELECT a.* FROM A a " + joinType + " B b on b.ID = a.ID;");
+
+        checkLogListener(false,
+            "SELECT a.* FROM A a " + joinType + " B b on b._KEY = a._KEY;");
+
+        checkLogListener(false,
+            "SELECT a.* FROM A a " + joinType + " B b on b.ID = a.ID and a.TITLE != 'Title';");
+
+        checkLogListener(false,
+            "SELECT a.* FROM A a " + joinType + " B b on b.ID = a.ID and b.PRICE > 100;");
+
+        checkLogListener(false,
+            "SELECT a.* FROM A a " + joinType + " B b on b.ID = a.ID and b.PRICE != a.ID;");
+
+        // Some wrong queries.
+
+        checkLogListener(true,
+            "SELECT a.* FROM A a " + joinType + " B b on 1 = 1;");
+
+        checkLogListener(true,
+            "SELECT a.* FROM A a " + joinType + " B b where 1 = 1;");
+
+        checkLogListener(true,
+            "SELECT a.* FROM A a " + joinType + " B b on a.ID != b.ID;");
+
+        checkLogListener(true,
+            "SELECT a.* FROM A a " + joinType + " B b where a.ID != b.ID;");
+
+        checkLogListener(true,
+            "SELECT a.* FROM A a " + joinType + " B b on b.ID = 1;");
+
+        checkLogListener(true,
+            "SELECT a.* FROM A a " + joinType + " B b where b.ID = 1;");
+
+        checkLogListener(true,
+            "SELECT a.* FROM A a " + joinType + " B b on a.ID = b.PRICE;");
+
+        checkLogListener(true,
+            "SELECT a.* FROM A a " + joinType + " B b where a.ID = b.PRICE;");
+    }
+
+
+    /** */
+    @Test
+    // This test should not pass as it contains wrong query. But current check pass it.
+    public void testWrongQueryButAllAffinityKeysAreUsed() {
+        execute(crd, new SqlFieldsQuery(
+            "CREATE TABLE A (ID INT, TITLE VARCHAR, PRICE INT, COMMENT VARCHAR, PRIMARY KEY (ID, TITLE, PRICE));"));
+
+        // PRICE = ID
+        checkLogListener(false,
+                "SELECT a1.* FROM A a1 LEFT JOIN A a2 on a1.PRICE = a2.ID and a1.TITLE = a2.TITLE and a1.ID = a2.PRICE;");
+    }
+
+    /** */
+    private void checkLogListener(boolean shouldFindMsg, String sql) {
+        LogListener lsnr = LogListener.matches(
+            "For join two partitioned tables join condition should be the equality operation of affinity keys"
+        ).build();
+
+        testLog.registerListener(lsnr);
+
+        execute(crd, new SqlFieldsQuery(sql));
+
+        if (shouldFindMsg)
+            assertTrue(sql, lsnr.check());
+        else
+            assertFalse(sql, lsnr.check());
+
+        testLog.clearListeners();
+    }
+
+    /**
+     * Execute query from node.
+     *
+     * @param node node to use to perform query.
+     * @param qry query.
+     */
+    protected final void execute(Ignite node, SqlFieldsQuery qry) {
+        FieldsQueryCursor<List<?>> cursor = ((IgniteEx)node).context().query().querySqlFields(qry, false);
+
+        cursor.getAll();
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index fbcd303..c37d315 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -288,6 +288,7 @@
 import org.apache.ignite.internal.sql.SqlParserTransactionalKeywordsSelfTest;
 import org.apache.ignite.internal.sql.SqlParserUserSelfTest;
 import org.apache.ignite.spi.communication.tcp.GridOrderedMessageCancelSelfTest;
+import org.apache.ignite.sqltests.CheckWarnJoinPartitionedTables;
 import org.apache.ignite.sqltests.PartitionedSqlTest;
 import org.apache.ignite.sqltests.ReplicatedSqlCustomPartitionsTest;
 import org.apache.ignite.sqltests.ReplicatedSqlTest;
@@ -312,6 +313,7 @@
     PartitionedSqlTest.class,
     ReplicatedSqlTest.class,
     ReplicatedSqlCustomPartitionsTest.class,
+    CheckWarnJoinPartitionedTables.class,
 
     SqlParserCreateIndexSelfTest.class,
     SqlParserDropIndexSelfTest.class,