blob: 06061a38281c5ac5f97a8d4f829475a15b888cc4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.sql.engine.planner;
import static org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.List;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.rel.AbstractIgniteJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.type.NativeTypes;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
/**
* Test suite to verify join colocation.
*/
public class JoinColocationPlannerTest extends AbstractPlannerTest {
/**
* Join of the same tables with a simple affinity is expected to be colocated.
*/
@Test
public void joinSameTableSimpleAffMergeJoin() throws Exception {
IgniteTable tbl = simpleTable("TEST_TBL", DEFAULT_TBL_SIZE);
IgniteSchema schema = createSchema(tbl);
String sql = "select count(*) "
+ "from TEST_TBL t1 "
+ "join TEST_TBL t2 on t1.id = t2.id";
RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "HashJoinConverter");
IgniteMergeJoin join = findFirstNode(phys, byClass(IgniteMergeJoin.class));
String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
assertThat(invalidPlanMsg, join, notNullValue());
assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class));
assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class));
}
/**
* Join of the same tables with a simple affinity is expected to be colocated.
*/
@Test
public void joinSameTableSimpleAffHashJoin() throws Exception {
IgniteTable tbl = simpleTable("TEST_TBL", DEFAULT_TBL_SIZE);
IgniteSchema schema = createSchema(tbl);
String sql = "select count(*) "
+ "from TEST_TBL t1 "
+ "join TEST_TBL t2 on t1.id = t2.id";
// Only hash join
RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "MergeJoinConverter");
AbstractIgniteJoin join = findFirstNode(phys, byClass(AbstractIgniteJoin.class));
List<RelNode> joinNodes = findNodes(phys, byClass(AbstractIgniteJoin.class));
String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
assertThat(invalidPlanMsg, joinNodes.size(), equalTo(1));
assertThat(invalidPlanMsg, join, notNullValue());
assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
}
/**
* Join of the same tables with a complex affinity is expected to be colocated.
*/
@ParameterizedTest(name = "DISABLED: {0}")
@ValueSource(strings = {"HashJoinConverter", "MergeJoinConverter"})
public void joinSameTableComplexAff(String disabledRule) throws Exception {
IgniteTable tbl = complexTbl("TEST_TBL");
IgniteSchema schema = createSchema(tbl);
String sql = "select count(*) "
+ "from TEST_TBL t1 "
+ "join TEST_TBL t2 on t1.id1 = t2.id1 and t1.id2 = t2.id2";
RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", disabledRule);
AbstractIgniteJoin join = findFirstNode(phys, byClass(AbstractIgniteJoin.class));
String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
assertThat(invalidPlanMsg, join, notNullValue());
assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
if (!"MergeJoinConverter".equals(disabledRule)) {
assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class));
assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class));
}
}
/**
* Join of the same tables with a complex affinity is expected to be colocated.
*/
@ParameterizedTest(name = "DISABLED: {0}")
@ValueSource(strings = {"HashJoinConverter", "MergeJoinConverter"})
public void joinDiffTablesComplexAff(String disabledRule) throws Exception {
IgniteTable tbl1 = complexTbl("TEST_TBL1");
IgniteTable tbl2 = complexTbl("TEST_TBL2");
IgniteSchema schema = createSchema(tbl1, tbl2);
String sql = "select count(*) "
+ "from TEST_TBL1 t1 "
+ "join TEST_TBL2 t2 on t1.id1 = t2.id1 and t1.id2 = t2.id2";
RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", disabledRule);
AbstractIgniteJoin join = findFirstNode(phys, byClass(AbstractIgniteJoin.class));
String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
assertThat(invalidPlanMsg, join, notNullValue());
assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
if (!"MergeJoinConverter".equals(disabledRule)) {
assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class));
assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class));
}
}
/**
* Join of the same tables with a complex affinity is expected to be colocated.
*/
@ParameterizedTest(name = "DISABLED: {0}")
@ValueSource(strings = {"HashJoinConverter", "MergeJoinConverter"})
public void joinDiffTablesNotSupersetComplexAff(String disabledRule) throws Exception {
IgniteTable tbl1 = complexTbl("TEST_TBL1");
IgniteTable tbl2 = complexTbl("TEST_TBL2");
IgniteSchema schema = createSchema(tbl1, tbl2);
String sql = "select count(*) "
+ "from TEST_TBL1 t1 "
+ "join TEST_TBL2 t2 on t1.id1 = t2.id1";
RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", disabledRule);
AbstractIgniteJoin join = findFirstNode(phys, byClass(AbstractIgniteJoin.class));
String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
assertThat(invalidPlanMsg, join, notNullValue());
assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(false));
}
/**
* Re-hashing based on simple affinity.
*
* <p>The smaller table should be sent to the bigger one.
*/
@Test
public void joinComplexToSimpleAff() throws Exception {
IgniteTable complexTbl = complexTbl("COMPLEX_TBL", 2 * DEFAULT_TBL_SIZE,
IgniteDistributions.affinity(ImmutableIntList.of(0, 1), nextTableId(), DEFAULT_ZONE_ID));
IgniteTable simpleTbl = simpleTable("SIMPLE_TBL", DEFAULT_TBL_SIZE);
IgniteSchema schema = createSchema(complexTbl, simpleTbl);
String sql = "select count(*) "
+ "from COMPLEX_TBL t1 "
+ "join SIMPLE_TBL t2 on t1.id1 = t2.id and t1.id2 = t2.id2";
assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
.and(hasDistribution(complexTbl.distribution()))
.and(input(0, isInstanceOf(IgniteIndexScan.class)
.and(scan -> complexTbl.equals(scan.getTable().unwrap(IgniteTable.class)))
))
.and(input(1, isInstanceOf(IgniteSort.class)
.and(input(isInstanceOf(IgniteExchange.class)
.and(input(isInstanceOf(IgniteTableScan.class)
.and(scan -> simpleTbl.equals(scan.getTable().unwrap(IgniteTable.class)))
))
))
))
), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "HashJoinConverter");
}
/**
* Re-hashing for complex affinity.
*
* <p>The smaller table should be sent to the bigger one.
*/
@Test
public void joinComplexToComplexAffWithDifferentOrder() throws Exception {
IgniteTable complexTblDirect = complexTbl(
"COMPLEX_TBL_DIRECT",
2 * DEFAULT_TBL_SIZE,
IgniteDistributions.affinity(ImmutableIntList.of(0, 1), nextTableId(), DEFAULT_ZONE_ID));
IgniteTable complexTblIndirect = complexTbl(
"COMPLEX_TBL_INDIRECT",
DEFAULT_TBL_SIZE,
IgniteDistributions.affinity(ImmutableIntList.of(1, 0), nextTableId(), DEFAULT_ZONE_ID));
IgniteSchema schema = createSchema(complexTblDirect, complexTblIndirect);
String sql = "select count(*) "
+ "from COMPLEX_TBL_DIRECT t1 "
+ "join COMPLEX_TBL_INDIRECT t2 on t1.id1 = t2.id1 and t1.id2 = t2.id2";
assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
.and(hasDistribution(single()))
.and(input(0, isInstanceOf(IgniteExchange.class)
.and(input(isInstanceOf(IgniteIndexScan.class)
.and(scan -> complexTblDirect.equals(scan.getTable().unwrap(IgniteTable.class)))
))
))
.and(input(1, isInstanceOf(IgniteExchange.class)
.and(input(isInstanceOf(IgniteIndexScan.class)
.and(scan -> complexTblIndirect.equals(scan.getTable().unwrap(IgniteTable.class)))
))
))
), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "HashJoinConverter");
}
static IgniteTable simpleTable(String tableName, int size) {
return TestBuilders.table()
.name(tableName)
.size(size)
.distribution(someAffinity())
.addColumn("ID", NativeTypes.INT32)
.addColumn("ID2", NativeTypes.INT32)
.addColumn("VAL", NativeTypes.STRING)
.sortedIndex()
.name("PK")
.addColumn("ID", Collation.ASC_NULLS_LAST)
.end()
.build();
}
static IgniteTable simpleTableHashPk(String tableName, int size) {
return TestBuilders.table()
.name(tableName)
.size(size)
.distribution(someAffinity())
.addColumn("ID", NativeTypes.INT32)
.addColumn("ID2", NativeTypes.INT32)
.addColumn("VAL", NativeTypes.STRING)
.hashIndex()
.name("PK")
.addColumn("ID")
.end()
.build();
}
static IgniteTable complexTbl(String tableName) {
return complexTbl(tableName, DEFAULT_TBL_SIZE,
IgniteDistributions.affinity(ImmutableIntList.of(0, 1), nextTableId(), DEFAULT_ZONE_ID));
}
private static IgniteTable complexTbl(String tableName, int size, IgniteDistribution distribution) {
return TestBuilders.table()
.name(tableName)
.size(size)
.distribution(distribution)
.addColumn("ID1", NativeTypes.INT32)
.addColumn("ID2", NativeTypes.INT32)
.addColumn("VAL", NativeTypes.STRING)
.sortedIndex()
.name("PK")
.addColumn("ID1", Collation.ASC_NULLS_LAST)
.addColumn("ID2", Collation.ASC_NULLS_LAST)
.end()
.build();
}
}