blob: 58e0b7813ccdbd232ddb4ab2d39d2c73f0cc6710 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.query;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryTestCaseBase;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.jdbc.FetchResultSet;
import org.apache.tajo.jdbc.TajoMemoryResultSet;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.worker.TajoWorker;
import org.junit.Test;
import java.io.IOException;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE;
import static org.junit.Assert.*;
public class TestTablePartitions extends QueryTestCaseBase {
public TestTablePartitions() throws IOException {
super(TajoConstants.DEFAULT_DATABASE_NAME);
}
@Test
public final void testCreateColumnPartitionedTable() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTable");
ResultSet res = executeString(
"create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
res = testBase.execute(
"insert overwrite into " + tableName + " select l_orderkey, l_partkey, " +
"l_quantity from lineitem");
MasterPlan plan = getQueryPlan(res);
ExecutionBlock rootEB = plan.getRoot();
/*
-------------------------------------------------------------------------------
|-eb_1405354886454_0001_000003
|-eb_1405354886454_0001_000002
|-eb_1405354886454_0001_000001
*/
assertEquals(1, plan.getChildCount(rootEB.getId()));
ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0);
assertNotNull(insertEB);
assertEquals(NodeType.INSERT, insertEB.getPlan().getType());
assertEquals(1, plan.getChildCount(insertEB.getId()));
ExecutionBlock scanEB = plan.getChild(insertEB.getId(), 0);
List<DataChannel> list = plan.getOutgoingChannels(scanEB.getId());
assertEquals(1, list.size());
DataChannel channel = list.get(0);
assertNotNull(channel);
assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType());
assertEquals(1, channel.getShuffleKeys().length);
res.close();
}
@Test
public final void testCreateColumnPartitionedTableWithJoin() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTableWithJoin");
ResultSet res = executeString(
"create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
res = testBase.execute(
"insert overwrite into " + tableName + " select l_orderkey, l_partkey, " +
"l_quantity from lineitem join orders on l_orderkey = o_orderkey");
MasterPlan plan = getQueryPlan(res);
ExecutionBlock rootEB = plan.getRoot();
/*
-------------------------------------------------------------------------------
|-eb_1405356074433_0001_000005
|-eb_1405356074433_0001_000004
|-eb_1405356074433_0001_000003
|-eb_1405356074433_0001_000002
|-eb_1405356074433_0001_000001
*/
assertEquals(1, plan.getChildCount(rootEB.getId()));
ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0);
assertNotNull(insertEB);
assertEquals(NodeType.INSERT, insertEB.getPlan().getType());
assertEquals(1, plan.getChildCount(insertEB.getId()));
ExecutionBlock scanEB = plan.getChild(insertEB.getId(), 0);
List<DataChannel> list = plan.getOutgoingChannels(scanEB.getId());
assertEquals(1, list.size());
DataChannel channel = list.get(0);
assertNotNull(channel);
assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType());
assertEquals(1, channel.getShuffleKeys().length);
res.close();
}
@Test
public final void testCreateColumnPartitionedTableWithSelectedColumns() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTableWithSelectedColumns");
ResultSet res = executeString(
"create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
assertEquals(4, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
res = executeString("insert overwrite into " + tableName + " (col1, col2, key) select l_orderkey, " +
"l_partkey, l_quantity from lineitem");
res.close();
}
@Test
public final void testColumnPartitionedTableByOneColumn() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByOneColumn");
ResultSet res = executeString(
"create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString("insert overwrite into " + tableName
+ " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem");
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
assertPartitionDirectories(desc);
res = executeString(
"select distinct * from " + tableName + " where (key = 45.0 or key = 38.0) and null_col is null");
Map<Double, int []> resultRows1 = Maps.newHashMap();
resultRows1.put(45.0d, new int[]{3, 2});
resultRows1.put(38.0d, new int[]{2, 2});
for (int i = 0; i < 2; i++) {
assertTrue(res.next());
assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(1));
assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(2));
}
res.close();
}
private void assertPartitionDirectories(TableDesc desc) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path path = new Path(desc.getPath());
assertTrue(fs.isDirectory(path));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=17.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=36.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=38.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=45.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=49.0")));
if (!testingCluster.isHCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
}
@Test
public final void testQueryCasesOnColumnPartitionedTable() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testQueryCasesOnColumnPartitionedTable");
ResultSet res = executeString(
"create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString(
"insert overwrite into " + tableName
+ " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem");
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
assertPartitionDirectories(desc);
res = executeFile("case1.sql");
assertResultSet(res, "case1.result");
res.close();
res = executeFile("case2.sql");
assertResultSet(res, "case2.result");
res.close();
res = executeFile("case3.sql");
assertResultSet(res, "case3.result");
res.close();
// select pow(key, 2) from testQueryCasesOnColumnPartitionedTable
res = executeFile("case4.sql");
assertResultSet(res, "case4.result");
res.close();
// select round(pow(key + 1, 2)) from testQueryCasesOnColumnPartitionedTable
res = executeFile("case5.sql");
assertResultSet(res, "case5.result");
res.close();
// select col1, key from testQueryCasesOnColumnPartitionedTable order by pow(key, 2) desc
res = executeFile("case6.sql");
assertResultSet(res, "case6.result");
res.close();
// select col1, key from testQueryCasesOnColumnPartitionedTable WHERE key BETWEEN 35 AND 48;
res = executeFile("case7.sql");
assertResultSet(res, "case7.result");
res.close();
// select col1, CASE key WHEN 36 THEN key WHEN 49 THEN key ELSE key END from testQueryCasesOnColumnPartitionedTable;
res = executeFile("case8.sql");
assertResultSet(res, "case8.result");
res.close();
// select col1, CAST(key AS INT) from testQueryCasesOnColumnPartitionedTable;
res = executeFile("case9.sql");
assertResultSet(res, "case9.result");
res.close();
// select col1, (!(key > 35)) from testQueryCasesOnColumnPartitionedTable;
res = executeFile("case10.sql");
assertResultSet(res, "case10.result");
res.close();
// alias partition column
res = executeFile("case11.sql");
assertResultSet(res, "case11.result");
res.close();
// alias partition column in group by, order by
res = executeFile("case12.sql");
assertResultSet(res, "case12.result");
res.close();
// alias partition column in stage
res = executeFile("case13.sql");
assertResultSet(res, "case13.result");
res.close();
}
@Test
public final void testColumnPartitionedTableByThreeColumns() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByThreeColumns");
ResultSet res = testBase.execute(
"create table " + tableName + " (col4 text) partition by column(col1 int4, col2 int4, col3 float8) ");
res.close();
TajoTestingCluster cluster = testBase.getTestingCluster();
CatalogService catalog = cluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString("insert overwrite into " + tableName
+ " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
Path path = new Path(desc.getPath());
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.isDirectory(path));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
if (!testingCluster.isHCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
res = executeString("select * from " + tableName + " where col2 = 2");
Map<Double, int []> resultRows1 = Maps.newHashMap();
resultRows1.put(45.0d, new int[]{3, 2});
resultRows1.put(38.0d, new int[]{2, 2});
for (int i = 0; i < 2; i++) {
assertTrue(res.next());
assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(2));
assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(3));
}
res.close();
Map<Double, int []> resultRows2 = Maps.newHashMap();
resultRows2.put(49.0d, new int[]{3, 3});
resultRows2.put(45.0d, new int[]{3, 2});
resultRows2.put(38.0d, new int[]{2, 2});
res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
for (int i = 0; i < 3; i++) {
assertTrue(res.next());
assertEquals(resultRows2.get(res.getDouble(4))[0], res.getInt(2));
assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3));
}
res.close();
}
@Test
public final void testInsertIntoColumnPartitionedTableByThreeColumns() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testInsertIntoColumnPartitionedTableByThreeColumns");
res = testBase.execute(
"create table " + tableName + " (col4 text) partition by column(col1 int4, col2 int4, col3 float8) ");
res.close();
TajoTestingCluster cluster = testBase.getTestingCluster();
CatalogService catalog = cluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString("insert into " + tableName
+ " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
Path path = new Path(desc.getPath());
FileSystem fs = FileSystem.get(conf);
verifyDirectoriesForThreeColumns(fs, path, 1);
if (!testingCluster.isHCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
res = executeString("select * from " + tableName + " where col2 = 2");
Map<Double, int []> resultRows1 = Maps.newHashMap();
resultRows1.put(45.0d, new int[]{3, 2});
resultRows1.put(38.0d, new int[]{2, 2});
for (int i = 0; i < 2; i++) {
assertTrue(res.next());
assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(2));
assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(3));
}
res.close();
Map<Double, int []> resultRows2 = Maps.newHashMap();
resultRows2.put(49.0d, new int[]{3, 3});
resultRows2.put(45.0d, new int[]{3, 2});
resultRows2.put(38.0d, new int[]{2, 2});
res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
for (int i = 0; i < 3; i++) {
assertTrue(res.next());
assertEquals(resultRows2.get(res.getDouble(4))[0], res.getInt(2));
assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3));
}
res.close();
// insert into already exists partitioned table
res = executeString("insert into " + tableName
+ " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
res.close();
desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
path = new Path(desc.getPath());
verifyDirectoriesForThreeColumns(fs, path, 2);
if (!testingCluster.isHCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
String expected = "N\n" +
"N\n" +
"N\n" +
"N\n" +
"N\n" +
"N\n" +
"R\n" +
"R\n" +
"R\n" +
"R\n";
String tableData = getTableFileContents(new Path(desc.getPath()));
assertEquals(expected, tableData);
res = executeString("select * from " + tableName + " where col2 = 2");
String resultSetData = resultSetToString(res);
res.close();
expected = "col4,col1,col2,col3\n" +
"-------------------------------\n" +
"N,2,2,38.0\n" +
"N,2,2,38.0\n" +
"R,3,2,45.0\n" +
"R,3,2,45.0\n";
assertEquals(expected, resultSetData);
res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
resultSetData = resultSetToString(res);
res.close();
expected = "col4,col1,col2,col3\n" +
"-------------------------------\n" +
"N,2,2,38.0\n" +
"N,2,2,38.0\n" +
"R,3,2,45.0\n" +
"R,3,2,45.0\n" +
"R,3,3,49.0\n" +
"R,3,3,49.0\n";
assertEquals(expected, resultSetData);
// Check not to remove existing partition directories.
res = executeString("insert overwrite into " + tableName
+ " select l_returnflag, l_orderkey, l_partkey, 30.0 as l_quantity from lineitem "
+ " where l_orderkey = 1 and l_partkey = 1 and l_linenumber = 1");
res.close();
verifyDirectoriesForThreeColumns(fs, path, 3);
if (!testingCluster.isHCatalogStoreRunning()) {
// TODO: If there is existing another partition directory, we must add its rows number to result row numbers.
// desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
// assertEquals(6, desc.getStats().getNumRows().intValue());
}
verifyKeptExistingData(res, tableName);
// insert overwrite empty result to partitioned table
res = executeString("insert overwrite into " + tableName
+ " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem where l_orderkey > 100");
res.close();
verifyDirectoriesForThreeColumns(fs, path, 4);
verifyKeptExistingData(res, tableName);
executeString("DROP TABLE " + tableName + " PURGE").close();
}
private final void verifyKeptExistingData(ResultSet res, String tableName) throws Exception {
res = executeString("select * from " + tableName + " where col2 = 1");
String resultSetData = resultSetToString(res);
res.close();
String expected = "col4,col1,col2,col3\n" +
"-------------------------------\n" +
"N,1,1,17.0\n" +
"N,1,1,17.0\n" +
"N,1,1,30.0\n" +
"N,1,1,36.0\n" +
"N,1,1,36.0\n";
assertEquals(expected, resultSetData);
}
private final void verifyDirectoriesForThreeColumns(FileSystem fs, Path path, int step) throws Exception {
assertTrue(fs.isDirectory(path));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
if (step == 1 || step == 2) {
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
} else {
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=30.0")));
}
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
}
@Test
public final void testColumnPartitionedTableByOneColumnsWithCompression() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByOneColumnsWithCompression");
ResultSet res = executeString(
"create table " + tableName + " (col2 int4, col3 float8) USING csv " +
"WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"PARTITION BY column(col1 int4)");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString(
"insert overwrite into " + tableName + " select l_partkey, l_quantity, l_orderkey from lineitem");
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
if (!testingCluster.isHCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.exists(new Path(desc.getPath())));
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
Path path = new Path(desc.getPath());
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
for (FileStatus partition : fs.listStatus(path)){
assertTrue(fs.isDirectory(partition.getPath()));
for (FileStatus file : fs.listStatus(partition.getPath())) {
CompressionCodec codec = factory.getCodec(file.getPath());
assertTrue(codec instanceof DeflateCodec);
}
}
}
@Test
public final void testColumnPartitionedTableByTwoColumnsWithCompression() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByTwoColumnsWithCompression");
ResultSet res = executeString("create table " + tableName + " (col3 float8, col4 text) USING csv " +
"WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"PARTITION by column(col1 int4, col2 int4)");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString(
"insert overwrite into " + tableName +
" select l_quantity, l_returnflag, l_orderkey, l_partkey from lineitem");
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
if (!testingCluster.isHCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.exists(new Path(desc.getPath())));
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
Path path = new Path(desc.getPath());
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
for (FileStatus partition1 : fs.listStatus(path)){
assertTrue(fs.isDirectory(partition1.getPath()));
for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
assertTrue(fs.isDirectory(partition2.getPath()));
for (FileStatus file : fs.listStatus(partition2.getPath())) {
CompressionCodec codec = factory.getCodec(file.getPath());
assertTrue(codec instanceof DeflateCodec);
}
}
}
}
@Test
public final void testColumnPartitionedTableByThreeColumnsWithCompression() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByThreeColumnsWithCompression");
ResultSet res = executeString(
"create table " + tableName + " (col4 text) USING csv " +
"WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"partition by column(col1 int4, col2 int4, col3 float8)");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString(
"insert overwrite into " + tableName +
" select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
if (!testingCluster.isHCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.exists(new Path(desc.getPath())));
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
Path path = new Path(desc.getPath());
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
for (FileStatus partition1 : fs.listStatus(path)){
assertTrue(fs.isDirectory(partition1.getPath()));
for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
assertTrue(fs.isDirectory(partition2.getPath()));
for (FileStatus partition3 : fs.listStatus(partition2.getPath())) {
assertTrue(fs.isDirectory(partition3.getPath()));
for (FileStatus file : fs.listStatus(partition3.getPath())) {
CompressionCodec codec = factory.getCodec(file.getPath());
assertTrue(codec instanceof DeflateCodec);
}
}
}
}
res = executeString("select * from " + tableName + " where col2 = 2");
Map<Double, int []> resultRows1 = Maps.newHashMap();
resultRows1.put(45.0d, new int[]{3, 2});
resultRows1.put(38.0d, new int[]{2, 2});
int i = 0;
while (res.next()) {
assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(2));
assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(3));
i++;
}
res.close();
assertEquals(2, i);
Map<Double, int []> resultRows2 = Maps.newHashMap();
resultRows2.put(49.0d, new int[]{3, 3});
resultRows2.put(45.0d, new int[]{3, 2});
resultRows2.put(38.0d, new int[]{2, 2});
res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
i = 0;
while(res.next()) {
assertEquals(resultRows2.get(res.getDouble(4))[0], res.getInt(2));
assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3));
i++;
}
res.close();
assertEquals(3, i);
}
@Test
public final void testColumnPartitionedTableNoMatchedPartition() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableNoMatchedPartition");
ResultSet res = executeString(
"create table " + tableName + " (col4 text) USING csv " +
"WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"partition by column(col1 int4, col2 int4, col3 float8)");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString(
"insert overwrite into " + tableName +
" select l_returnflag , l_orderkey, l_partkey, l_quantity from lineitem");
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
if (!testingCluster.isHCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.exists(new Path(desc.getPath())));
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
Path path = new Path(desc.getPath());
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
for (FileStatus partition1 : fs.listStatus(path)){
assertTrue(fs.isDirectory(partition1.getPath()));
for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
assertTrue(fs.isDirectory(partition2.getPath()));
for (FileStatus partition3 : fs.listStatus(partition2.getPath())) {
assertTrue(fs.isDirectory(partition3.getPath()));
for (FileStatus file : fs.listStatus(partition3.getPath())) {
CompressionCodec codec = factory.getCodec(file.getPath());
assertTrue(codec instanceof DeflateCodec);
}
}
}
}
res = executeString("select * from " + tableName + " where col2 = 9");
assertFalse(res.next());
res.close();
}
@Test
public final void testColumnPartitionedTableWithSmallerExpressions1() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions1");
ResultSet res = executeString(
"create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
ClientProtos.SubmitQueryResponse response = client.executeQuery("insert overwrite into " + tableName
+ " select l_orderkey, l_partkey from lineitem");
assertTrue(response.hasErrorMessage());
assertEquals(response.getErrorMessage(), "INSERT has smaller expressions than target columns\n");
res = executeFile("case14.sql");
assertResultSet(res, "case14.result");
res.close();
}
@Test
public final void testColumnPartitionedTableWithSmallerExpressions2() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions2");
ResultSet res = executeString(
"create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
ClientProtos.SubmitQueryResponse response = client.executeQuery("insert overwrite into " + tableName
+ " select l_returnflag , l_orderkey, l_partkey from lineitem");
assertTrue(response.hasErrorMessage());
assertEquals(response.getErrorMessage(), "INSERT has smaller expressions than target columns\n");
res = executeFile("case15.sql");
assertResultSet(res, "case15.result");
res.close();
}
@Test
public final void testColumnPartitionedTableWithSmallerExpressions3() throws Exception {
ResultSet res = executeString("create database testinsertquery1;");
res.close();
res = executeString("create database testinsertquery2;");
res.close();
res = executeString("create table testinsertquery1.table1 " +
"(col1 int4, col2 int4, col3 float8)");
res.close();
res = executeString("create table testinsertquery2.table1 " +
"(col1 int4, col2 int4, col3 float8)");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable("testinsertquery1", "table1"));
assertTrue(catalog.existsTable("testinsertquery2", "table1"));
res = executeString("insert overwrite into testinsertquery1.table1 " +
"select l_orderkey, l_partkey, l_quantity from default.lineitem;");
res.close();
TableDesc desc = catalog.getTableDesc("testinsertquery1", "table1");
if (!testingCluster.isHCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
res = executeString("insert overwrite into testinsertquery2.table1 " +
"select col1, col2, col3 from testinsertquery1.table1;");
res.close();
desc = catalog.getTableDesc("testinsertquery2", "table1");
if (!testingCluster.isHCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
}
@Test
public final void testColumnPartitionedTableWithSmallerExpressions5() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions5");
ResultSet res = executeString(
"create table " + tableName + " (col1 text) partition by column(col2 text) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString("insert overwrite into " + tableName + "(col1) select l_returnflag from lineitem");
res.close();
res = executeString("select * from " + tableName);
assertResultSet(res);
res.close();
}
@Test
public final void testColumnPartitionedTableWithSmallerExpressions6() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions6");
ResultSet res = executeString(
"create table " + tableName + " (col1 text) partition by column(col2 text) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString(
"insert overwrite into " + tableName + "(col1) select l_returnflag from lineitem where l_orderkey = 1");
res.close();
res = executeString("select * from " + tableName);
assertResultSet(res);
res.close();
}
private MasterPlan getQueryPlan(ResultSet res) {
QueryId queryId;
if (res instanceof TajoMemoryResultSet) {
queryId = ((TajoMemoryResultSet) res).getQueryId();
} else {
queryId = ((FetchResultSet) res).getQueryId();
}
for (TajoWorker eachWorker : testingCluster.getTajoWorkers()) {
QueryMasterTask queryMasterTask = eachWorker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId, true);
if (queryMasterTask != null) {
return queryMasterTask.getQuery().getPlan();
}
}
fail("Can't find query from workers" + queryId);
return null;
}
@Test
public void testScatteredHashShuffle() throws Exception {
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.varname, "2");
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME.varname, "1");
try {
KeyValueSet tableOptions = new KeyValueSet();
tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
Schema schema = new Schema();
schema.addColumn("col1", TajoDataTypes.Type.TEXT);
schema.addColumn("col2", TajoDataTypes.Type.TEXT);
List<String> data = new ArrayList<String>();
int totalBytes = 0;
Random rand = new Random(System.currentTimeMillis());
String col2Data = "Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2" +
"Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2" +
"Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2";
int index = 0;
while(true) {
int col1RandomValue = 1;
String str = col1RandomValue + "|col2-" + index + "-" + col2Data;
data.add(str);
totalBytes += str.getBytes().length;
if (totalBytes > 4 * 1024 * 1024) {
break;
}
index++;
}
TajoTestingCluster.createTable("testscatteredhashshuffle", schema, tableOptions, data.toArray(new String[]{}), 3);
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable("default", "testscatteredhashshuffle"));
executeString("create table test_partition (col2 text) partition by column (col1 text)").close();
executeString("insert into test_partition select col2, col1 from testscatteredhashshuffle").close();
ResultSet res = executeString("select col1 from test_partition");
int numRows = 0;
while (res.next()) {
numRows++;
}
assertEquals(data.size(), numRows);
// assert data file size
} finally {
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.varname,
TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.defaultVal);
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME.varname,
TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME.defaultVal);
executeString("DROP TABLE test_partition PURGE").close();
executeString("DROP TABLE testScatteredHashShuffle PURGE").close();
}
}
@Test
public final void TestSpecialCharPartitionKeys1() throws Exception {
// See - TAJO-947: ColPartitionStoreExec can cause URISyntaxException due to special characters.
executeDDL("lineitemspecial_ddl.sql", "lineitemspecial.tbl");
executeString("CREATE TABLE IF NOT EXISTS pTable947 (id int, name text) PARTITION BY COLUMN (type text)")
.close();
executeString("INSERT OVERWRITE INTO pTable947 SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial")
.close();
ResultSet res = executeString("select * from pTable947 where type='RA:*?><I/L#%S' or type='AIR'");
String resStr = resultSetToString(res);
String expected =
"id,name,type\n" +
"-------------------------------\n"
+ "3,NONE,AIR\n"
+ "3,TEST SPECIAL CHARS,RA:*?><I/L#%S\n";
assertEquals(expected, resStr);
cleanupQuery(res);
}
@Test
public final void TestSpecialCharPartitionKeys2() throws Exception {
// See - TAJO-947: ColPartitionStoreExec can cause URISyntaxException due to special characters.
executeDDL("lineitemspecial_ddl.sql", "lineitemspecial.tbl");
executeString("CREATE TABLE IF NOT EXISTS pTable948 (id int, name text) PARTITION BY COLUMN (type text)")
.close();
executeString("INSERT OVERWRITE INTO pTable948 SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial")
.close();
ResultSet res = executeString("select * from pTable948 where type='RA:*?><I/L#%S'");
assertResultSet(res);
cleanupQuery(res);
res = executeString("select * from pTable948 where type='RA:*?><I/L#%S' or type='AIR01'");
assertResultSet(res);
cleanupQuery(res);
}
@Test
public final void testIgnoreFilesInIntermediateDir() throws Exception {
// See - TAJO-1219: Files located in intermediate directories of partitioned table should be ignored
// It verifies that Tajo ignores files located in intermediate directories of partitioned table.
Path testDir = CommonTestingUtil.getTestDir();
executeString(
"CREATE EXTERNAL TABLE testIgnoreFilesInIntermediateDir (col1 int) USING CSV PARTITION BY COLUMN (col2 text) " +
"LOCATION '" + testDir + "'");
FileSystem fs = testDir.getFileSystem(conf);
FSDataOutputStream fos = fs.create(new Path(testDir, "table1.data"));
fos.write("a|b|c".getBytes());
fos.close();
ResultSet res = executeString("select * from testIgnoreFilesInIntermediateDir;");
assertFalse(res.next());
res.close();
}
}