blob: e8b1011e9fb42e6e4cf084564b0180f0c8c53604 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.planner;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.OverridableConf;
import org.apache.tajo.QueryTestCaseBase;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.KeyValueSet;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestPartitionedTableRewriter extends QueryTestCaseBase {
final static String PARTITION_TABLE_NAME = "tb_partition";
final static String MULTIPLE_PARTITION_TABLE_NAME = "tb_multiple_partition";
@BeforeClass
public static void setUp() throws Exception {
FileSystem fs = FileSystem.get(conf);
Path rootDir = TajoConf.getWarehouseDir(testingCluster.getConfiguration());
Schema schema = new Schema();
schema.addColumn("n_nationkey", TajoDataTypes.Type.INT8);
schema.addColumn("n_name", TajoDataTypes.Type.TEXT);
schema.addColumn("n_regionkey", TajoDataTypes.Type.INT8);
TableMeta meta = CatalogUtil.newTableMeta("TEXT", new KeyValueSet());
createExternalTableIncludedOnePartitionKeyColumn(fs, rootDir, schema, meta);
createExternalTableIncludedMultiplePartitionKeyColumns(fs, rootDir, schema, meta);
}
private static void createExternalTableIncludedOnePartitionKeyColumn(FileSystem fs, Path rootDir, Schema schema,
TableMeta meta) throws Exception {
Schema partSchema = new Schema();
partSchema.addColumn("key", TajoDataTypes.Type.TEXT);
PartitionMethodDesc partitionMethodDesc =
new PartitionMethodDesc("TestPartitionedTableRewriter", PARTITION_TABLE_NAME,
CatalogProtos.PartitionType.COLUMN, "key", partSchema);
Path tablePath = new Path(rootDir, PARTITION_TABLE_NAME);
fs.mkdirs(tablePath);
client.createExternalTable(PARTITION_TABLE_NAME, schema, tablePath.toUri(), meta, partitionMethodDesc);
TableDesc tableDesc = client.getTableDesc(PARTITION_TABLE_NAME);
assertNotNull(tableDesc);
Path path = new Path(tableDesc.getUri().toString() + "/key=part123");
fs.mkdirs(path);
FileUtil.writeTextToFile("1|ARGENTINA|1", new Path(path, "data"));
path = new Path(tableDesc.getUri().toString() + "/key=part456");
fs.mkdirs(path);
FileUtil.writeTextToFile("2|BRAZIL|1", new Path(path, "data"));
path = new Path(tableDesc.getUri().toString() + "/key=part789");
fs.mkdirs(path);
FileUtil.writeTextToFile("3|CANADA|1", new Path(path, "data"));
}
private static void createExternalTableIncludedMultiplePartitionKeyColumns(FileSystem fs, Path rootDir,
Schema schema, TableMeta meta) throws Exception {
Schema partSchema = new Schema();
partSchema.addColumn("key1", TajoDataTypes.Type.TEXT);
partSchema.addColumn("key2", TajoDataTypes.Type.TEXT);
partSchema.addColumn("key3", TajoDataTypes.Type.INT8);
PartitionMethodDesc partitionMethodDesc =
new PartitionMethodDesc("TestPartitionedTableRewriter", MULTIPLE_PARTITION_TABLE_NAME,
CatalogProtos.PartitionType.COLUMN, "key1,key2,key3", partSchema);
Path tablePath = new Path(rootDir, MULTIPLE_PARTITION_TABLE_NAME);
fs.mkdirs(tablePath);
client.createExternalTable(MULTIPLE_PARTITION_TABLE_NAME, schema, tablePath.toUri(), meta, partitionMethodDesc);
TableDesc tableDesc = client.getTableDesc(MULTIPLE_PARTITION_TABLE_NAME);
assertNotNull(tableDesc);
Path path = new Path(tableDesc.getUri().toString() + "/key1=part123");
fs.mkdirs(path);
path = new Path(tableDesc.getUri().toString() + "/key1=part123/key2=supp123");
fs.mkdirs(path);
path = new Path(tableDesc.getUri().toString() + "/key1=part123/key2=supp123/key3=1");
fs.mkdirs(path);
FileUtil.writeTextToFile("1|ARGENTINA|1", new Path(path, "data"));
path = new Path(tableDesc.getUri().toString() + "/key1=part123/key2=supp123/key3=2");
fs.mkdirs(path);
FileUtil.writeTextToFile("2|BRAZIL|1", new Path(path, "data"));
path = new Path(tableDesc.getUri().toString() + "/key1=part789");
fs.mkdirs(path);
path = new Path(tableDesc.getUri().toString() + "/key1=part789/key2=supp789");
fs.mkdirs(path);
path = new Path(tableDesc.getUri().toString() + "/key1=part789/key2=supp789/key3=3");
fs.mkdirs(path);
FileUtil.writeTextToFile("3|CANADA|1", new Path(path, "data"));
}
@AfterClass
public static void tearDown() throws Exception {
client.executeQuery("DROP TABLE IF EXISTS " + PARTITION_TABLE_NAME + " PURGE;");
client.executeQuery("DROP TABLE IF EXISTS " + MULTIPLE_PARTITION_TABLE_NAME + " PURGE;");
}
@Test
public void testFilterIncludePartitionKeyColumn() throws Exception {
Expr expr = sqlParser.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " WHERE key = 'part456' ORDER BY key");
QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration());
LogicalPlan newPlan = planner.createPlan(defaultContext, expr);
LogicalNode plan = newPlan.getRootBlock().getRoot();
assertEquals(NodeType.ROOT, plan.getType());
LogicalRootNode root = (LogicalRootNode) plan;
ProjectionNode projNode = root.getChild();
assertEquals(NodeType.SORT, projNode.getChild().getType());
SortNode sortNode = projNode.getChild();
assertEquals(NodeType.SELECTION, sortNode.getChild().getType());
SelectionNode selNode = sortNode.getChild();
assertTrue(selNode.hasQual());
assertEquals(NodeType.SCAN, selNode.getChild().getType());
ScanNode scanNode = selNode.getChild();
scanNode.setQual(selNode.getQual());
PartitionedTableRewriter rewriter = new PartitionedTableRewriter();
OverridableConf conf = CommonTestingUtil.getSessionVarsForTest();
Path[] filteredPaths = rewriter.findFilteredPartitionPaths(conf, scanNode);
assertNotNull(filteredPaths);
assertEquals(1, filteredPaths.length);
assertEquals("key=part456", filteredPaths[0].getName());
}
@Test
public void testWithoutAnyFilters() throws Exception {
Expr expr = sqlParser.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " ORDER BY key");
QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration());
LogicalPlan newPlan = planner.createPlan(defaultContext, expr);
LogicalNode plan = newPlan.getRootBlock().getRoot();
assertEquals(NodeType.ROOT, plan.getType());
LogicalRootNode root = (LogicalRootNode) plan;
ProjectionNode projNode = root.getChild();
assertEquals(NodeType.SORT, projNode.getChild().getType());
SortNode sortNode = projNode.getChild();
assertEquals(NodeType.SCAN, sortNode.getChild().getType());
ScanNode scanNode = sortNode.getChild();
PartitionedTableRewriter rewriter = new PartitionedTableRewriter();
OverridableConf conf = CommonTestingUtil.getSessionVarsForTest();
Path[] filteredPaths = rewriter.findFilteredPartitionPaths(conf, scanNode);
assertNotNull(filteredPaths);
assertEquals(3, filteredPaths.length);
assertEquals("key=part123", filteredPaths[0].getName());
assertEquals("key=part456", filteredPaths[1].getName());
assertEquals("key=part789", filteredPaths[2].getName());
}
@Test
public void testFilterIncludeNonExistingPartitionValue() throws Exception {
Expr expr = sqlParser.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " WHERE key = 'part123456789'");
QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration());
LogicalPlan newPlan = planner.createPlan(defaultContext, expr);
LogicalNode plan = newPlan.getRootBlock().getRoot();
assertEquals(NodeType.ROOT, plan.getType());
LogicalRootNode root = (LogicalRootNode) plan;
ProjectionNode projNode = root.getChild();
assertEquals(NodeType.SELECTION, projNode.getChild().getType());
SelectionNode selNode = projNode.getChild();
assertTrue(selNode.hasQual());
assertEquals(NodeType.SCAN, selNode.getChild().getType());
ScanNode scanNode = selNode.getChild();
scanNode.setQual(selNode.getQual());
PartitionedTableRewriter rewriter = new PartitionedTableRewriter();
OverridableConf conf = CommonTestingUtil.getSessionVarsForTest();
Path[] filteredPaths = rewriter.findFilteredPartitionPaths(conf, scanNode);
assertNotNull(filteredPaths);
assertEquals(0, filteredPaths.length);
}
@Test
public void testFilterIncludeNonPartitionKeyColumn() throws Exception {
String sql = "SELECT * FROM " + PARTITION_TABLE_NAME + " WHERE n_nationkey = 1";
Expr expr = sqlParser.parse(sql);
QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration());
LogicalPlan newPlan = planner.createPlan(defaultContext, expr);
LogicalNode plan = newPlan.getRootBlock().getRoot();
assertEquals(NodeType.ROOT, plan.getType());
LogicalRootNode root = (LogicalRootNode) plan;
ProjectionNode projNode = root.getChild();
assertEquals(NodeType.SELECTION, projNode.getChild().getType());
SelectionNode selNode = projNode.getChild();
assertTrue(selNode.hasQual());
assertEquals(NodeType.SCAN, selNode.getChild().getType());
ScanNode scanNode = selNode.getChild();
scanNode.setQual(selNode.getQual());
PartitionedTableRewriter rewriter = new PartitionedTableRewriter();
OverridableConf conf = CommonTestingUtil.getSessionVarsForTest();
Path[] filteredPaths = rewriter.findFilteredPartitionPaths(conf, scanNode);
assertNotNull(filteredPaths);
assertEquals(3, filteredPaths.length);
assertEquals("key=part123", filteredPaths[0].getName());
assertEquals("key=part456", filteredPaths[1].getName());
assertEquals("key=part789", filteredPaths[2].getName());
}
@Test
public void testFilterIncludeEveryPartitionKeyColumn() throws Exception {
Expr expr = sqlParser.parse("SELECT * FROM " + MULTIPLE_PARTITION_TABLE_NAME
+ " WHERE key1 = 'part789' and key2 = 'supp789' and key3=3");
QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration());
LogicalPlan newPlan = planner.createPlan(defaultContext, expr);
LogicalNode plan = newPlan.getRootBlock().getRoot();
assertEquals(NodeType.ROOT, plan.getType());
LogicalRootNode root = (LogicalRootNode) plan;
ProjectionNode projNode = root.getChild();
assertEquals(NodeType.SELECTION, projNode.getChild().getType());
SelectionNode selNode = projNode.getChild();
assertTrue(selNode.hasQual());
assertEquals(NodeType.SCAN, selNode.getChild().getType());
ScanNode scanNode = selNode.getChild();
scanNode.setQual(selNode.getQual());
PartitionedTableRewriter rewriter = new PartitionedTableRewriter();
OverridableConf conf = CommonTestingUtil.getSessionVarsForTest();
Path[] filteredPaths = rewriter.findFilteredPartitionPaths(conf, scanNode);
assertNotNull(filteredPaths);
assertEquals(1, filteredPaths.length);
assertEquals("key3=3", filteredPaths[0].getName());
assertEquals("key2=supp789", filteredPaths[0].getParent().getName());
assertEquals("key1=part789", filteredPaths[0].getParent().getParent().getName());
}
@Test
public void testFilterIncludeSomeOfPartitionKeyColumns() throws Exception {
Expr expr = sqlParser.parse("SELECT * FROM " + MULTIPLE_PARTITION_TABLE_NAME
+ " WHERE key1 = 'part123' and key2 = 'supp123' order by n_nationkey");
QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration());
LogicalPlan newPlan = planner.createPlan(defaultContext, expr);
LogicalNode plan = newPlan.getRootBlock().getRoot();
assertEquals(NodeType.ROOT, plan.getType());
LogicalRootNode root = (LogicalRootNode) plan;
ProjectionNode projNode = root.getChild();
assertEquals(NodeType.SORT, projNode.getChild().getType());
SortNode sortNode = projNode.getChild();
assertEquals(NodeType.SELECTION, sortNode.getChild().getType());
SelectionNode selNode = sortNode.getChild();
assertTrue(selNode.hasQual());
assertEquals(NodeType.SCAN, selNode.getChild().getType());
ScanNode scanNode = selNode.getChild();
scanNode.setQual(selNode.getQual());
PartitionedTableRewriter rewriter = new PartitionedTableRewriter();
OverridableConf conf = CommonTestingUtil.getSessionVarsForTest();
Path[] filteredPaths = rewriter.findFilteredPartitionPaths(conf, scanNode);
assertNotNull(filteredPaths);
assertEquals(2, filteredPaths.length);
assertEquals("key3=1", filteredPaths[0].getName());
assertEquals("key2=supp123", filteredPaths[0].getParent().getName());
assertEquals("key1=part123", filteredPaths[0].getParent().getParent().getName());
assertEquals("key3=2", filteredPaths[1].getName());
assertEquals("key2=supp123", filteredPaths[1].getParent().getName());
assertEquals("key1=part123", filteredPaths[1].getParent().getParent().getName());
}
@Test
public void testFilterIncludeNonPartitionKeyColumns() throws Exception {
Expr expr = sqlParser.parse("SELECT * FROM " + MULTIPLE_PARTITION_TABLE_NAME
+ " WHERE key1 = 'part123' and n_nationkey >= 2 order by n_nationkey");
QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration());
LogicalPlan newPlan = planner.createPlan(defaultContext, expr);
LogicalNode plan = newPlan.getRootBlock().getRoot();
assertEquals(NodeType.ROOT, plan.getType());
LogicalRootNode root = (LogicalRootNode) plan;
ProjectionNode projNode = root.getChild();
assertEquals(NodeType.SORT, projNode.getChild().getType());
SortNode sortNode = projNode.getChild();
assertEquals(NodeType.SELECTION, sortNode.getChild().getType());
SelectionNode selNode = sortNode.getChild();
assertTrue(selNode.hasQual());
assertEquals(NodeType.SCAN, selNode.getChild().getType());
ScanNode scanNode = selNode.getChild();
scanNode.setQual(selNode.getQual());
PartitionedTableRewriter rewriter = new PartitionedTableRewriter();
OverridableConf conf = CommonTestingUtil.getSessionVarsForTest();
Path[] filteredPaths = rewriter.findFilteredPartitionPaths(conf, scanNode);
assertNotNull(filteredPaths);
assertEquals(2, filteredPaths.length);
assertEquals("key3=1", filteredPaths[0].getName());
assertEquals("key2=supp123", filteredPaths[0].getParent().getName());
assertEquals("key1=part123", filteredPaths[0].getParent().getParent().getName());
assertEquals("key3=2", filteredPaths[1].getName());
assertEquals("key2=supp123", filteredPaths[1].getParent().getName());
assertEquals("key1=part123", filteredPaths[1].getParent().getParent().getName());
}
@Test
public final void testPartitionPruningWitCTAS() throws Exception {
String tableName = "testPartitionPruningUsingDirectories".toLowerCase();
String canonicalTableName = CatalogUtil.getCanonicalTableName("\"TestPartitionedTableRewriter\"", tableName);
executeString(
"create table " + canonicalTableName + "(col1 int4, col2 int4) partition by column(key float8) "
+ " as select l_orderkey, l_partkey, l_quantity from default.lineitem");
TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), tableName);
assertNotNull(tableDesc);
// With a filter which checks a partition key column
Expr expr = sqlParser.parse("SELECT * FROM " + canonicalTableName + " WHERE key <= 40.0 ORDER BY key");
QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration());
LogicalPlan newPlan = planner.createPlan(defaultContext, expr);
LogicalNode plan = newPlan.getRootBlock().getRoot();
assertEquals(NodeType.ROOT, plan.getType());
LogicalRootNode root = (LogicalRootNode) plan;
ProjectionNode projNode = root.getChild();
assertEquals(NodeType.SORT, projNode.getChild().getType());
SortNode sortNode = projNode.getChild();
assertEquals(NodeType.SELECTION, sortNode.getChild().getType());
SelectionNode selNode = sortNode.getChild();
assertTrue(selNode.hasQual());
assertEquals(NodeType.SCAN, selNode.getChild().getType());
ScanNode scanNode = selNode.getChild();
scanNode.setQual(selNode.getQual());
PartitionedTableRewriter rewriter = new PartitionedTableRewriter();
OverridableConf conf = CommonTestingUtil.getSessionVarsForTest();
Path[] filteredPaths = rewriter.findFilteredPartitionPaths(conf, scanNode);
assertNotNull(filteredPaths);
assertEquals(3, filteredPaths.length);
assertEquals("key=17.0", filteredPaths[0].getName());
assertEquals("key=36.0", filteredPaths[1].getName());
assertEquals("key=38.0", filteredPaths[2].getName());
executeString("DROP TABLE " + canonicalTableName + " PURGE").close();
}
}