blob: 2a9dc13c5dc52036e10bb77f177ad00505ddee6d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.sql.planner;
import java.util.Collection;
import java.util.Map;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.sql.dsl.SamzaSqlDslConverterFactory;
import org.apache.samza.sql.interfaces.DslConverter;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
import org.apache.samza.sql.util.SamzaSqlTestConfig;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestQueryPlanner {
@Test
public void testTranslate() {
Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
String sql =
"Insert into testavro.outputTopic(id) select MyTest(id) from testavro.level1.level2.SIMPLE1 as s where s.id = 10";
config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
Collection<RelRoot> relRoots = dslConverter.convertDsl(sql);
assertEquals(1, relRoots.size());
}
@Test
public void testRemoteJoinWithFilter() throws SamzaSqlValidatorException {
testRemoteJoinWithFilterHelper(false);
}
@Test
public void testRemoteJoinWithUdfAndFilter() throws SamzaSqlValidatorException {
testRemoteJoinWithUdfAndFilterHelper(false);
}
@Test
public void testRemoteJoinWithFilterAndOptimizer() throws SamzaSqlValidatorException {
testRemoteJoinWithFilterHelper(true);
}
@Test
public void testRemoteJoinWithUdfAndFilterAndOptimizer() throws SamzaSqlValidatorException {
testRemoteJoinWithUdfAndFilterHelper(true);
}
void testRemoteJoinWithFilterHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ " p.name as profileName, p.address as profileAddress "
+ "from testavro.PAGEVIEW as pv "
+ "join testRemoteStore.Profile.`$table` as p "
+ " on p.__key__ = pv.profileId"
+ " where p.name = pv.pageKey AND p.name = 'Mike' AND pv.profileId = 1";
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
Config samzaConfig = new MapConfig(staticConfigs);
DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
Collection<RelRoot> relRoots = dslConverter.convertDsl(sql);
/*
Query plan without optimization:
LogicalProject(__key__=[$1], pageKey=[$1], companyName=['N/A'], profileName=[$5], profileAddress=[$7])
LogicalFilter(condition=[AND(=($5, $1), =($5, 'Mike'), =($2, 1))])
LogicalJoin(condition=[=($3, $2)], joinType=[inner])
LogicalTableScan(table=[[testavro, PAGEVIEW]])
LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
Query plan with optimization:
LogicalProject(__key__=[$1], pageKey=[$1], companyName=['N/A'], profileName=[$5], profileAddress=[$7])
LogicalFilter(condition=[AND(=($5, $1), =($5, 'Mike'))])
LogicalJoin(condition=[=($3, $2)], joinType=[inner])
LogicalFilter(condition=[=($2, 1)])
LogicalTableScan(table=[[testavro, PAGEVIEW]])
LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
*/
assertEquals(1, relRoots.size());
RelRoot relRoot = relRoots.iterator().next();
RelNode relNode = relRoot.rel;
assertTrue(relNode instanceof LogicalProject);
relNode = relNode.getInput(0);
assertTrue(relNode instanceof LogicalFilter);
if (enableOptimizer) {
assertEquals("AND(=($1, $5), =($5, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
} else {
assertEquals("AND(=(1, $2), =($1, $5), =($5, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
}
relNode = relNode.getInput(0);
assertTrue(relNode instanceof LogicalJoin);
assertEquals(2, relNode.getInputs().size());
LogicalJoin join = (LogicalJoin) relNode;
RelNode left = join.getLeft();
RelNode right = join.getRight();
assertTrue(right instanceof LogicalTableScan);
if (enableOptimizer) {
assertTrue(left instanceof LogicalFilter);
assertEquals("=(1, $2)", ((LogicalFilter) left).getCondition().toString());
assertTrue(left.getInput(0) instanceof LogicalTableScan);
} else {
assertTrue(left instanceof LogicalTableScan);
}
}
void testRemoteJoinWithUdfAndFilterHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ " p.name as profileName, p.address as profileAddress "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
+ " on p.__key__ = BuildOutputRecord('id', pv.profileId)"
+ " where p.name = 'Mike' and pv.profileId = 1 and p.name = pv.pageKey";
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
Config samzaConfig = new MapConfig(staticConfigs);
DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
Collection<RelRoot> relRoots = dslConverter.convertDsl(sql);
/*
Query plan without optimization:
LogicalProject(__key__=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
LogicalFilter(condition=[AND(=($2, 'Mike'), =($10, 1), =($2, $9))]) ==> Only the second condition could be pushed down.
LogicalProject(__key__=[$0], id=[$1], name=[$2], companyId=[$3], address=[$4], selfEmployed=[$5],
phoneNumbers=[$6], mapValues=[$7], __key__0=[$8], pageKey=[$9], profileId=[$10])
==> ProjectMergeRule removes this redundant node.
LogicalJoin(condition=[=($0, $11)], joinType=[inner])
LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
LogicalProject(__key__=[$0], pageKey=[$1], profileId=[$2], $f3=[BuildOutputRecord('id', $2)]) ==> Filter is pushed above project.
LogicalTableScan(table=[[testavro, PAGEVIEW]])
Query plan with optimization:
LogicalProject(__key__=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
LogicalFilter(condition=[AND(=($2, 'Mike'), =($2, $9))])
LogicalJoin(condition=[=($0, $11)], joinType=[inner])
LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
LogicalFilter(condition=[=($2, 1)])
LogicalProject(__key__=[$0], pageKey=[$1], profileId=[$2], $f3=[BuildOutputRecord('id', $2)])
LogicalTableScan(table=[[testavro, PAGEVIEW]])
*/
assertEquals(1, relRoots.size());
RelRoot relRoot = relRoots.iterator().next();
RelNode relNode = relRoot.rel;
assertTrue(relNode instanceof LogicalProject);
relNode = relNode.getInput(0);
assertTrue(relNode instanceof LogicalFilter);
if (enableOptimizer) {
assertEquals("AND(=($2, $9), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
} else {
assertEquals("AND(=($2, $9), =(1, $10), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
}
relNode = relNode.getInput(0);
if (enableOptimizer) {
assertTrue(relNode instanceof LogicalJoin);
assertEquals(2, relNode.getInputs().size());
} else {
assertTrue(relNode instanceof LogicalProject);
relNode = relNode.getInput(0);
}
LogicalJoin join = (LogicalJoin) relNode;
RelNode left = join.getLeft();
RelNode right = join.getRight();
assertTrue(left instanceof LogicalTableScan);
if (enableOptimizer) {
assertTrue(right instanceof LogicalFilter);
assertEquals("=(1, $2)", ((LogicalFilter) right).getCondition().toString());
relNode = right.getInput(0);
} else {
relNode = right;
}
assertTrue(relNode instanceof LogicalProject);
relNode = relNode.getInput(0);
assertTrue(relNode instanceof LogicalTableScan);
}
@Test
public void testLocalStreamTableInnerJoinFilterOptimization() throws Exception {
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName,"
+ " p.address as profileAddress "
+ "from testavro.PROFILE.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
+ " on p.id = pv.profileId "
+ "where p.name = 'Mike'";
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(true));
Config samzaConfig = new MapConfig(staticConfigs);
DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
Collection<RelRoot> relRootsWithOptimization = dslConverter.convertDsl(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(false));
samzaConfig = new MapConfig(staticConfigs);
dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
Collection<RelRoot> relRootsWithoutOptimization = dslConverter.convertDsl(sql);
// We do not yet have any join filter optimizations for local joins. Hence the plans with and without optimization
// should be the same.
assertEquals(RelOptUtil.toString(relRootsWithOptimization.iterator().next().rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES),
RelOptUtil.toString(relRootsWithoutOptimization.iterator().next().rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
}
@Test
public void testRemoteJoinFilterPushDownWithUdfInFilterAndOptimizer() throws SamzaSqlValidatorException {
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ " p.name as profileName, p.address as profileAddress "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
+ " on p.__key__ = pv.profileId"
+ " where p.name = pv.pageKey AND p.name = 'Mike' AND pv.profileId = MyTest(pv.profileId)";
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(true));
Config samzaConfig = new MapConfig(staticConfigs);
DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
Collection<RelRoot> relRoots = dslConverter.convertDsl(sql);
/*
Query plan without optimization:
LogicalProject(__key__=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
LogicalFilter(condition=[AND(=($2, $9), =($2, 'Mike'), =($10, CAST(MyTest($10)):INTEGER))])
LogicalJoin(condition=[=($0, $10)], joinType=[inner])
LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
LogicalTableScan(table=[[testavro, PAGEVIEW]])
Query plan with optimization:
LogicalProject(__key__=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
LogicalFilter(condition=[AND(=($2, $9), =($2, 'Mike'))])
LogicalJoin(condition=[=($0, $10)], joinType=[inner])
LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
LogicalFilter(condition=[=($2, CAST(MyTest($2)):INTEGER)])
LogicalTableScan(table=[[testavro, PAGEVIEW]])
*/
assertEquals(1, relRoots.size());
RelRoot relRoot = relRoots.iterator().next();
RelNode relNode = relRoot.rel;
assertTrue(relNode instanceof LogicalProject);
relNode = relNode.getInput(0);
assertTrue(relNode instanceof LogicalFilter);
assertEquals("AND(=($2, $9), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
relNode = relNode.getInput(0);
assertTrue(relNode instanceof LogicalJoin);
assertEquals(2, relNode.getInputs().size());
LogicalJoin join = (LogicalJoin) relNode;
RelNode left = join.getLeft();
RelNode right = join.getRight();
assertTrue(left instanceof LogicalTableScan);
assertTrue(right instanceof LogicalFilter);
assertEquals("=($2, CAST(MyTest($2)):INTEGER)", ((LogicalFilter) right).getCondition().toString());
assertTrue(right.getInput(0) instanceof LogicalTableScan);
}
@Test
public void testRemoteJoinNoFilterPushDownWithUdfInFilterAndOptimizer() throws SamzaSqlValidatorException {
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ " p.name as profileName, p.address as profileAddress "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
+ " on p.__key__ = pv.profileId"
+ " where p.name = pv.pageKey AND p.name = 'Mike' AND pv.profileId = MyTestPoly(p.name)";
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(true));
Config samzaConfig = new MapConfig(staticConfigs);
DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
Collection<RelRoot> relRootsWithOptimization = dslConverter.convertDsl(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(false));
samzaConfig = new MapConfig(staticConfigs);
dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
Collection<RelRoot> relRootsWithoutOptimization = dslConverter.convertDsl(sql);
/*
LogicalProject(__key__=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
LogicalFilter(condition=[AND(=($2, $9), =($2, 'Mike'), =($10, CAST(MyTestPoly($10)):INTEGER))])
LogicalJoin(condition=[=($0, $10)], joinType=[inner])
LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
LogicalTableScan(table=[[testavro, PAGEVIEW]])
*/
// None of the conditions in the filter could be pushed down as they all require a remote call. Hence the plans
// with and without optimization should be the same.
assertEquals(RelOptUtil.toString(relRootsWithOptimization.iterator().next().rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES),
RelOptUtil.toString(relRootsWithoutOptimization.iterator().next().rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
}
}