blob: 6c480b3933248e223b3e0d523feb3829d09fd147 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.utframe.UtFrameUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.util.List;
import java.util.Set;
import java.util.UUID;
public class DistributedPlannerTest {
private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/";
private static ConnectContext ctx;
@BeforeClass
public static void setUp() throws Exception {
UtFrameUtils.createMinDorisCluster(runningDir);
ctx = UtFrameUtils.createDefaultCtx();
String createDbStmtStr = "create database db1;";
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx);
Catalog.getCurrentCatalog().createDb(createDbStmt);
// create table tbl1
String createTblStmtStr = "create table db1.tbl1(k1 int, k2 varchar(32), v bigint sum) "
+ "AGGREGATE KEY(k1,k2) distributed by hash(k1) buckets 1 properties('replication_num' = '1');";
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx);
Catalog.getCurrentCatalog().createTable(createTableStmt);
// create table tbl2
createTblStmtStr = "create table db1.tbl2(k3 int, k4 varchar(32)) "
+ "DUPLICATE KEY(k3) distributed by hash(k3) buckets 1 properties('replication_num' = '1');";
createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx);
Catalog.getCurrentCatalog().createTable(createTableStmt);
}
@After
public void tearDown() throws Exception {
FileUtils.deleteDirectory(new File(runningDir));
}
@Test
public void testAssertFragmentWithDistributedInput(@Injectable AssertNumRowsNode assertNumRowsNode,
@Injectable PlanFragment inputFragment,
@Injectable PlanNodeId planNodeId,
@Injectable PlanFragmentId planFragmentId,
@Injectable PlanNode inputPlanRoot,
@Injectable TupleId tupleId,
@Mocked PlannerContext plannerContext) {
DistributedPlanner distributedPlanner = new DistributedPlanner(plannerContext);
List<TupleId> tupleIdList = Lists.newArrayList(tupleId);
Set<TupleId> tupleIdSet = Sets.newHashSet(tupleId);
Deencapsulation.setField(inputPlanRoot, "tupleIds", tupleIdList);
Deencapsulation.setField(inputPlanRoot, "tblRefIds", tupleIdList);
Deencapsulation.setField(inputPlanRoot, "nullableTupleIds", Sets.newHashSet(tupleId));
Deencapsulation.setField(inputPlanRoot, "conjuncts", Lists.newArrayList());
new Expectations() {
{
inputFragment.isPartitioned();
result = true;
plannerContext.getNextNodeId();
result = planNodeId;
plannerContext.getNextFragmentId();
result = planFragmentId;
inputFragment.getPlanRoot();
result = inputPlanRoot;
inputPlanRoot.getTupleIds();
result = tupleIdList;
inputPlanRoot.getTblRefIds();
result = tupleIdList;
inputPlanRoot.getNullableTupleIds();
result = tupleIdSet;
assertNumRowsNode.getChildren();
result = inputPlanRoot;
}
};
PlanFragment assertFragment = Deencapsulation.invoke(distributedPlanner, "createAssertFragment",
assertNumRowsNode, inputFragment);
Assert.assertFalse(assertFragment.isPartitioned());
Assert.assertSame(assertNumRowsNode, assertFragment.getPlanRoot());
}
@Test
public void testAssertFragmentWithUnpartitionInput(@Injectable AssertNumRowsNode assertNumRowsNode,
@Injectable PlanFragment inputFragment,
@Mocked PlannerContext plannerContext) {
DistributedPlanner distributedPlanner = new DistributedPlanner(plannerContext);
PlanFragment assertFragment = Deencapsulation.invoke(distributedPlanner, "createAssertFragment",
assertNumRowsNode, inputFragment);
Assert.assertSame(assertFragment, inputFragment);
Assert.assertTrue(assertFragment.getPlanRoot() instanceof AssertNumRowsNode);
}
@Test
public void testExplicitlyBroadcastJoin() throws Exception {
String sql = "explain select * from db1.tbl1 join [BROADCAST] db1.tbl2 on tbl1.k1 = tbl2.k3";
StmtExecutor stmtExecutor = new StmtExecutor(ctx, sql);
stmtExecutor.execute();
Planner planner = stmtExecutor.planner();
List<PlanFragment> fragments = planner.getFragments();
String plan = planner.getExplainString(fragments, TExplainLevel.NORMAL);
Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN (BROADCAST)"));
sql = "explain select * from db1.tbl1 join [SHUFFLE] db1.tbl2 on tbl1.k1 = tbl2.k3";
stmtExecutor = new StmtExecutor(ctx, sql);
stmtExecutor.execute();
planner = stmtExecutor.planner();
fragments = planner.getFragments();
plan = planner.getExplainString(fragments, TExplainLevel.NORMAL);
Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN (PARTITIONED)"));
}
}