blob: 196f332fbcc223b824e0304255c9ca237e529633 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.query;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryTestCaseBase;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
import org.apache.tajo.client.TajoClientUtil;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.exception.ReturnStateUtil;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.KeyValueSet;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.sql.ResultSet;
import java.util.*;
import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE;
import static org.junit.Assert.*;
@RunWith(Parameterized.class)
public class TestTablePartitions extends QueryTestCaseBase {
private NodeType nodeType;
public TestTablePartitions(NodeType nodeType) throws IOException {
super(TajoConstants.DEFAULT_DATABASE_NAME);
this.nodeType = nodeType;
}
@Parameters(name = "{index}: {0}")
public static Collection<Object[]> generateParameters() {
return Arrays.asList(new Object[][] {
//type
{NodeType.INSERT},
{NodeType.CREATE_TABLE},
});
}
@Test
public final void testCreateColumnPartitionedTable() throws Exception {
ResultSet res;
String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTable");
ClientProtos.SubmitQueryResponse response;
if (nodeType == NodeType.INSERT) {
res = executeString(
"create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
response = client.executeQuery(
"insert overwrite into " + tableName + " select l_orderkey, l_partkey, " +
"l_quantity from lineitem");
} else {
response = client.executeQuery(
"create table " + tableName + "(col1 int4, col2 int4) partition by column(key float8) "
+ " as select l_orderkey, l_partkey, l_quantity from lineitem");
}
QueryId queryId = new QueryId(response.getQueryId());
testingCluster.waitForQuerySubmitted(queryId, 10);
QueryMasterTask queryMasterTask = testingCluster.getQueryMasterTask(queryId);
assertNotNull(queryMasterTask);
TajoClientUtil.waitCompletion(client, queryId);
MasterPlan plan = queryMasterTask.getQuery().getPlan();
ExecutionBlock rootEB = plan.getRoot();
assertEquals(1, plan.getChildCount(rootEB.getId()));
ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0);
assertNotNull(insertEB);
assertEquals(nodeType, insertEB.getPlan().getType());
assertEquals(1, plan.getChildCount(insertEB.getId()));
ExecutionBlock scanEB = plan.getChild(insertEB.getId(), 0);
List<DataChannel> list = plan.getOutgoingChannels(scanEB.getId());
assertEquals(1, list.size());
DataChannel channel = list.get(0);
assertNotNull(channel);
assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType());
assertEquals(1, channel.getShuffleKeys().length);
TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"},
tableDesc.getStats().getNumRows());
executeString("DROP TABLE " + tableName + " PURGE").close();
}
@Test
public final void testCreateColumnPartitionedTableWithJoin() throws Exception {
ResultSet res;
ClientProtos.SubmitQueryResponse response;
String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTableWithJoin");
if (nodeType == NodeType.INSERT) {
res = executeString(
"create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
response = client.executeQuery(
"insert overwrite into " + tableName + " select l_orderkey, l_partkey, " +
"l_quantity from lineitem join orders on l_orderkey = o_orderkey");
} else {
response = client.executeQuery("create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) "
+ " AS select l_orderkey, l_partkey, l_quantity from lineitem join orders on l_orderkey = o_orderkey");
}
QueryId queryId = new QueryId(response.getQueryId());
testingCluster.waitForQuerySubmitted(queryId, 10);
QueryMasterTask queryMasterTask = testingCluster.getQueryMasterTask(queryId);
assertNotNull(queryMasterTask);
TajoClientUtil.waitCompletion(client, queryId);
MasterPlan plan = queryMasterTask.getQuery().getPlan();
ExecutionBlock rootEB = plan.getRoot();
assertEquals(1, plan.getChildCount(rootEB.getId()));
ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0);
assertNotNull(insertEB);
assertEquals(nodeType, insertEB.getPlan().getType());
assertEquals(1, plan.getChildCount(insertEB.getId()));
ExecutionBlock scanEB = plan.getChild(insertEB.getId(), 0);
List<DataChannel> list = plan.getOutgoingChannels(scanEB.getId());
assertEquals(1, list.size());
DataChannel channel = list.get(0);
assertNotNull(channel);
assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType());
assertEquals(1, channel.getShuffleKeys().length);
TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"},
tableDesc.getStats().getNumRows());
executeString("DROP TABLE " + tableName + " PURGE").close();
}
@Test
public final void testCreateColumnPartitionedTableWithSelectedColumns() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTableWithSelectedColumns");
if (nodeType == NodeType.INSERT) {
res = executeString(
"create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
assertEquals(4, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
res = executeString("insert overwrite into " + tableName + " (col1, col2, key) select l_orderkey, " +
"l_partkey, l_quantity from lineitem");
} else {
res = executeString("create table " + tableName + " (col1 int4, col2 int4, null_col int4)"
+ " partition by column(key float8) AS select l_orderkey, l_partkey, null, l_quantity from lineitem");
}
res.close();
TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"},
tableDesc.getStats().getNumRows());
executeString("DROP TABLE " + tableName + " PURGE").close();
}
@Test
public final void testColumnPartitionedTableByOneColumn() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByOneColumn");
if (nodeType == NodeType.INSERT) {
res = executeString(
"create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString("insert overwrite into " + tableName
+ " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem");
} else {
res = executeString("create table " + tableName + " (col1 int4, col2 int4, null_col int4) "
+ " partition by column(key float8) as select l_orderkey, l_partkey, null, l_quantity from lineitem");
}
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
assertPartitionDirectories(desc);
res = executeString(
"select distinct * from " + tableName + " where (key = 45.0 or key = 38.0) and null_col is null");
Map<Double, int []> resultRows1 = Maps.newHashMap();
resultRows1.put(45.0d, new int[]{3, 2});
resultRows1.put(38.0d, new int[]{2, 2});
for (int i = 0; i < 2; i++) {
assertTrue(res.next());
assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(1));
assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(2));
}
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName,
new String[]{"key"}, desc.getStats().getNumRows());
executeString("DROP TABLE " + tableName + " PURGE").close();
res.close();
}
private void assertPartitionDirectories(TableDesc desc) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path path = new Path(desc.getUri());
assertTrue(fs.isDirectory(path));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=17.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=36.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=38.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=45.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=49.0")));
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
}
@Test
public final void testQueryCasesOnColumnPartitionedTable() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testQueryCasesOnColumnPartitionedTable");
if (nodeType == NodeType.INSERT) {
res = executeString(
"create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString(
"insert overwrite into " + tableName
+ " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem");
} else {
res = executeString("create table " + tableName + " (col1 int4, col2 int4, null_col int4) "
+ " partition by column(key float8) as select l_orderkey, l_partkey, null, l_quantity from lineitem");
}
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
assertPartitionDirectories(desc);
res = executeFile("case1.sql");
assertResultSet(res, "case1.result");
res.close();
res = executeFile("case2.sql");
assertResultSet(res, "case2.result");
res.close();
res = executeFile("case3.sql");
assertResultSet(res, "case3.result");
res.close();
// select pow(key, 2) from testQueryCasesOnColumnPartitionedTable
res = executeFile("case4.sql");
assertResultSet(res, "case4.result");
res.close();
// select round(pow(key + 1, 2)) from testQueryCasesOnColumnPartitionedTable
res = executeFile("case5.sql");
assertResultSet(res, "case5.result");
res.close();
// select col1, key from testQueryCasesOnColumnPartitionedTable order by pow(key, 2) desc
res = executeFile("case6.sql");
assertResultSet(res, "case6.result");
res.close();
// select col1, key from testQueryCasesOnColumnPartitionedTable WHERE key BETWEEN 35 AND 48;
res = executeFile("case7.sql");
assertResultSet(res, "case7.result");
res.close();
// select col1, CASE key WHEN 36 THEN key WHEN 49 THEN key ELSE key END from testQueryCasesOnColumnPartitionedTable;
res = executeFile("case8.sql");
assertResultSet(res, "case8.result");
res.close();
// select col1, CAST(key AS INT) from testQueryCasesOnColumnPartitionedTable;
res = executeFile("case9.sql");
assertResultSet(res, "case9.result");
res.close();
// select col1, (!(key > 35)) from testQueryCasesOnColumnPartitionedTable;
res = executeFile("case10.sql");
assertResultSet(res, "case10.result");
res.close();
// alias partition column
res = executeFile("case11.sql");
assertResultSet(res, "case11.result");
res.close();
// alias partition column in group by, order by
res = executeFile("case12.sql");
assertResultSet(res, "case12.result");
res.close();
// alias partition column in stage
res = executeFile("case13.sql");
assertResultSet(res, "case13.result");
res.close();
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"},
desc.getStats().getNumRows());
executeString("DROP TABLE " + tableName + " PURGE").close();
res.close();
}
@Test
public final void testColumnPartitionedTableByThreeColumns() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByThreeColumns");
if (nodeType == NodeType.INSERT) {
res = testBase.execute(
"create table " + tableName + " (col4 text) partition by column(col1 int4, col2 int4, col3 float8) ");
res.close();
TajoTestingCluster cluster = testBase.getTestingCluster();
CatalogService catalog = cluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString("insert overwrite into " + tableName
+ " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
} else {
res = executeString( "create table " + tableName + " (col4 text) "
+ " partition by column(col1 int4, col2 int4, col3 float8) as select l_returnflag, l_orderkey, l_partkey, " +
"l_quantity from lineitem");
}
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
Path path = new Path(desc.getUri());
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.isDirectory(path));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
res = executeString("select * from " + tableName + " where col2 = 2");
Map<Double, int []> resultRows1 = Maps.newHashMap();
resultRows1.put(45.0d, new int[]{3, 2});
resultRows1.put(38.0d, new int[]{2, 2});
for (int i = 0; i < 2; i++) {
assertTrue(res.next());
assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(2));
assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(3));
}
res.close();
Map<Double, int []> resultRows2 = Maps.newHashMap();
resultRows2.put(49.0d, new int[]{3, 3});
resultRows2.put(45.0d, new int[]{3, 2});
resultRows2.put(38.0d, new int[]{2, 2});
res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
for (int i = 0; i < 3; i++) {
assertTrue(res.next());
assertEquals(resultRows2.get(res.getDouble(4))[0], res.getInt(2));
assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3));
}
res = executeString("select * from " + tableName + " WHERE (col1 ='1' or col1 = '100') and col3 > 20");
String result = resultSetToString(res);
String expectedResult = "col4,col1,col2,col3\n" +
"-------------------------------\n" +
"N,1,1,36.0\n";
res.close();
assertEquals(expectedResult, result);
res = executeString("SELECT col1, col2, col3 FROM " + tableName);
res.close();
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"},
desc.getStats().getNumRows());
executeString("DROP TABLE " + tableName + " PURGE").close();
res.close();
}
@Test
public final void testInsertIntoColumnPartitionedTableByThreeColumns() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testInsertIntoColumnPartitionedTableByThreeColumns");
if (nodeType == NodeType.INSERT) {
res = testBase.execute(
"create table " + tableName + " (col4 text) partition by column(col1 int4, col2 int4, col3 float8) ");
res.close();
TajoTestingCluster cluster = testBase.getTestingCluster();
CatalogService catalog = cluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString("insert into " + tableName
+ " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
} else {
res = executeString( "create table " + tableName + " (col4 text) "
+ " partition by column(col1 int4, col2 int4, col3 float8) as select l_returnflag, l_orderkey, l_partkey, " +
"l_quantity from lineitem");
}
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"},
desc.getStats().getNumRows());
Path path = new Path(desc.getUri());
FileSystem fs = FileSystem.get(conf);
verifyDirectoriesForThreeColumns(fs, path, 1);
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
res = executeString("select * from " + tableName + " where col2 = 2");
Map<Double, int []> resultRows1 = Maps.newHashMap();
resultRows1.put(45.0d, new int[]{3, 2});
resultRows1.put(38.0d, new int[]{2, 2});
for (int i = 0; i < 2; i++) {
assertTrue(res.next());
assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(2));
assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(3));
}
res.close();
Map<Double, int []> resultRows2 = Maps.newHashMap();
resultRows2.put(49.0d, new int[]{3, 3});
resultRows2.put(45.0d, new int[]{3, 2});
resultRows2.put(38.0d, new int[]{2, 2});
res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
for (int i = 0; i < 3; i++) {
assertTrue(res.next());
assertEquals(resultRows2.get(res.getDouble(4))[0], res.getInt(2));
assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3));
}
res.close();
// insert into already exists partitioned table
res = executeString("insert into " + tableName
+ " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
res.close();
desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
// TODO: When inserting into already exists partitioned table, table status need to change correctly.
// verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"},
// desc.getStats().getNumRows());
path = new Path(desc.getUri());
verifyDirectoriesForThreeColumns(fs, path, 2);
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
String expected = "N\n" +
"N\n" +
"N\n" +
"N\n" +
"N\n" +
"N\n" +
"R\n" +
"R\n" +
"R\n" +
"R\n";
String tableData = getTableFileContents(new Path(desc.getUri()));
assertEquals(expected, tableData);
res = executeString("select * from " + tableName + " where col2 = 2");
String resultSetData = resultSetToString(res);
res.close();
expected = "col4,col1,col2,col3\n" +
"-------------------------------\n" +
"N,2,2,38.0\n" +
"N,2,2,38.0\n" +
"R,3,2,45.0\n" +
"R,3,2,45.0\n";
assertEquals(expected, resultSetData);
res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
resultSetData = resultSetToString(res);
res.close();
expected = "col4,col1,col2,col3\n" +
"-------------------------------\n" +
"N,2,2,38.0\n" +
"N,2,2,38.0\n" +
"R,3,2,45.0\n" +
"R,3,2,45.0\n" +
"R,3,3,49.0\n" +
"R,3,3,49.0\n";
assertEquals(expected, resultSetData);
// Check not to remove existing partition directories.
res = executeString("insert overwrite into " + tableName
+ " select l_returnflag, l_orderkey, l_partkey, 30.0 as l_quantity from lineitem "
+ " where l_orderkey = 1 and l_partkey = 1 and l_linenumber = 1");
res.close();
verifyDirectoriesForThreeColumns(fs, path, 3);
if (!testingCluster.isHiveCatalogStoreRunning()) {
// TODO: If there is existing another partition directory, we must add its rows number to result row numbers.
// desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
// assertEquals(6, desc.getStats().getNumRows().intValue());
}
verifyKeptExistingData(res, tableName);
// insert overwrite empty result to partitioned table
res = executeString("insert overwrite into " + tableName
+ " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem where l_orderkey > 100");
res.close();
verifyDirectoriesForThreeColumns(fs, path, 4);
verifyKeptExistingData(res, tableName);
executeString("DROP TABLE " + tableName + " PURGE").close();
}
private final void verifyKeptExistingData(ResultSet res, String tableName) throws Exception {
res = executeString("select * from " + tableName + " where col2 = 1 order by col4, col1, col2, col3");
String resultSetData = resultSetToString(res);
res.close();
String expected = "col4,col1,col2,col3\n" +
"-------------------------------\n" +
"N,1,1,17.0\n" +
"N,1,1,17.0\n" +
"N,1,1,30.0\n" +
"N,1,1,36.0\n" +
"N,1,1,36.0\n";
assertEquals(expected, resultSetData);
}
private final void verifyDirectoriesForThreeColumns(FileSystem fs, Path path, int step) throws Exception {
assertTrue(fs.isDirectory(path));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
if (step == 1 || step == 2) {
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
} else {
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=30.0")));
}
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
}
@Test
public final void testColumnPartitionedTableByOneColumnsWithCompression() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByOneColumnsWithCompression");
if (nodeType == NodeType.INSERT) {
res = executeString(
"create table " + tableName + " (col2 int4, col3 float8) USING text " +
"WITH ('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"PARTITION BY column(col1 int4)");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString(
"insert overwrite into " + tableName + " select l_partkey, l_quantity, l_orderkey from lineitem");
} else {
res = executeString(
"create table " + tableName + " (col2 int4, col3 float8) USING text " +
"WITH ('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"PARTITION BY column(col1 int4) as select l_partkey, l_quantity, l_orderkey from lineitem");
}
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.exists(new Path(desc.getUri())));
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
Path path = new Path(desc.getUri());
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
for (FileStatus partition : fs.listStatus(path)){
assertTrue(fs.isDirectory(partition.getPath()));
for (FileStatus file : fs.listStatus(partition.getPath())) {
CompressionCodec codec = factory.getCodec(file.getPath());
assertTrue(codec instanceof DeflateCodec);
}
}
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1"},
desc.getStats().getNumRows());
executeString("DROP TABLE " + tableName + " PURGE").close();
}
@Test
public final void testColumnPartitionedTableByTwoColumnsWithCompression() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByTwoColumnsWithCompression");
if (nodeType == NodeType.INSERT) {
res = executeString("create table " + tableName + " (col3 float8, col4 text) USING text " +
"WITH ('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"PARTITION by column(col1 int4, col2 int4)");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString(
"insert overwrite into " + tableName +
" select l_quantity, l_returnflag, l_orderkey, l_partkey from lineitem");
} else {
res = executeString("create table " + tableName + " (col3 float8, col4 text) USING text " +
"WITH ('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"PARTITION by column(col1 int4, col2 int4) as select l_quantity, l_returnflag, l_orderkey, " +
"l_partkey from lineitem");
}
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.exists(new Path(desc.getUri())));
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
Path path = new Path(desc.getUri());
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
for (FileStatus partition1 : fs.listStatus(path)){
assertTrue(fs.isDirectory(partition1.getPath()));
for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
assertTrue(fs.isDirectory(partition2.getPath()));
for (FileStatus file : fs.listStatus(partition2.getPath())) {
CompressionCodec codec = factory.getCodec(file.getPath());
assertTrue(codec instanceof DeflateCodec);
}
}
}
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2"},
desc.getStats().getNumRows());
executeString("DROP TABLE " + tableName + " PURGE").close();
}
@Test
public final void testColumnPartitionedTableByThreeColumnsWithCompression() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByThreeColumnsWithCompression");
if (nodeType == NodeType.INSERT) {
res = executeString(
"create table " + tableName + " (col4 text) USING text " +
"WITH ('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"partition by column(col1 int4, col2 int4, col3 float8)");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString(
"insert overwrite into " + tableName +
" select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
} else {
res = executeString("create table " + tableName + " (col4 text) USING text " +
"WITH ('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"partition by column(col1 int4, col2 int4, col3 float8) as select l_returnflag, l_orderkey, l_partkey, " +
"l_quantity from lineitem");
}
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.exists(new Path(desc.getUri())));
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
Path path = new Path(desc.getUri());
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
for (FileStatus partition1 : fs.listStatus(path)){
assertTrue(fs.isDirectory(partition1.getPath()));
for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
assertTrue(fs.isDirectory(partition2.getPath()));
for (FileStatus partition3 : fs.listStatus(partition2.getPath())) {
assertTrue(fs.isDirectory(partition3.getPath()));
for (FileStatus file : fs.listStatus(partition3.getPath())) {
CompressionCodec codec = factory.getCodec(file.getPath());
assertTrue(codec instanceof DeflateCodec);
}
}
}
}
res = executeString("select * from " + tableName + " where col2 = 2");
Map<Double, int []> resultRows1 = Maps.newHashMap();
resultRows1.put(45.0d, new int[]{3, 2});
resultRows1.put(38.0d, new int[]{2, 2});
int i = 0;
while (res.next()) {
assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(2));
assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(3));
i++;
}
res.close();
assertEquals(2, i);
Map<Double, int []> resultRows2 = Maps.newHashMap();
resultRows2.put(49.0d, new int[]{3, 3});
resultRows2.put(45.0d, new int[]{3, 2});
resultRows2.put(38.0d, new int[]{2, 2});
res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
i = 0;
while(res.next()) {
assertEquals(resultRows2.get(res.getDouble(4))[0], res.getInt(2));
assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3));
i++;
}
res.close();
assertEquals(3, i);
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"},
desc.getStats().getNumRows());
executeString("DROP TABLE " + tableName + " PURGE").close();
}
@Test
public final void testColumnPartitionedTableNoMatchedPartition() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableNoMatchedPartition");
if (nodeType == NodeType.INSERT) {
res = executeString(
"create table " + tableName + " (col4 text) USING text " +
"WITH ('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"partition by column(col1 int4, col2 int4, col3 float8)");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString(
"insert overwrite into " + tableName +
" select l_returnflag , l_orderkey, l_partkey, l_quantity from lineitem");
} else {
res = executeString("create table " + tableName + " (col4 text) USING text " +
"WITH ('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"partition by column(col1 int4, col2 int4, col3 float8) as select l_returnflag , l_orderkey, l_partkey, " +
"l_quantity from lineitem");
}
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.exists(new Path(desc.getUri())));
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
Path path = new Path(desc.getUri());
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
for (FileStatus partition1 : fs.listStatus(path)){
assertTrue(fs.isDirectory(partition1.getPath()));
for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
assertTrue(fs.isDirectory(partition2.getPath()));
for (FileStatus partition3 : fs.listStatus(partition2.getPath())) {
assertTrue(fs.isDirectory(partition3.getPath()));
for (FileStatus file : fs.listStatus(partition3.getPath())) {
CompressionCodec codec = factory.getCodec(file.getPath());
assertTrue(codec instanceof DeflateCodec);
}
}
}
}
res = executeString("select * from " + tableName + " where col2 = 9");
assertFalse(res.next());
res.close();
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"},
desc.getStats().getNumRows());
executeString("DROP TABLE " + tableName + " PURGE").close();
}
@Test
public final void testColumnPartitionedTableWithSmallerExpressions1() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions1");
res = executeString(
"create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
ClientProtos.SubmitQueryResponse response = client.executeQuery("insert overwrite into " + tableName
+ " select l_orderkey, l_partkey from lineitem");
assertTrue(ReturnStateUtil.isError(response.getState()));
assertEquals(response.getState().getMessage(), "INSERT has smaller expressions than target columns");
res = executeFile("case14.sql");
assertResultSet(res, "case14.result");
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"},
desc.getStats().getNumRows());
executeString("DROP TABLE " + tableName + " PURGE").close();
}
@Test
public final void testColumnPartitionedTableWithSmallerExpressions2() throws Exception {
ResultSet res = null;
ClientProtos.SubmitQueryResponse response = null;
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions2");
if (nodeType == NodeType.INSERT) {
res = executeString(
"create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
response = client.executeQuery("insert overwrite into " + tableName
+ " select l_returnflag , l_orderkey, l_partkey from lineitem");
assertTrue(ReturnStateUtil.isError(response.getState()));
assertEquals(response.getState().getMessage(), "INSERT has smaller expressions than target columns");
res = executeFile("case15.sql");
assertResultSet(res, "case15.result");
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"},
desc.getStats().getNumRows());
executeString("DROP TABLE " + tableName + " PURGE").close();
}
}
@Test
public final void testColumnPartitionedTableWithSmallerExpressions3() throws Exception {
ResultSet res = executeString("create database testinsertquery1;");
res.close();
res = executeString("create database testinsertquery2;");
res.close();
if (nodeType == NodeType.INSERT) {
res = executeString("create table testinsertquery1.table1 " +
"(col1 int4, col2 int4, col3 float8)");
res.close();
res = executeString("create table testinsertquery2.table1 " +
"(col1 int4, col2 int4, col3 float8)");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable("testinsertquery1", "table1"));
assertTrue(catalog.existsTable("testinsertquery2", "table1"));
res = executeString("insert overwrite into testinsertquery1.table1 " +
"select l_orderkey, l_partkey, l_quantity from default.lineitem;");
res.close();
} else {
res = executeString("create table testinsertquery1.table1 " +
"(col1 int4, col2 int4, col3 float8) as select l_orderkey, l_partkey, l_quantity from default.lineitem;");
res.close();
}
TableDesc desc = catalog.getTableDesc("testinsertquery1", "table1");
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
if (nodeType == NodeType.INSERT) {
res = executeString("insert overwrite into testinsertquery2.table1 " +
"select col1, col2, col3 from testinsertquery1.table1;");
res.close();
} else {
res = executeString("create table testinsertquery2.table1 " +
"(col1 int4, col2 int4, col3 float8) as select col1, col2, col3 from testinsertquery1.table1;");
res.close();
}
desc = catalog.getTableDesc("testinsertquery2", "table1");
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
executeString("DROP TABLE testinsertquery1.table1 PURGE").close();
executeString("DROP TABLE testinsertquery2.table1 PURGE").close();
executeString("DROP DATABASE testinsertquery1").close();
executeString("DROP DATABASE testinsertquery2").close();
}
@Test
public final void testColumnPartitionedTableWithSmallerExpressions5() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions5");
if (nodeType == NodeType.INSERT) {
res = executeString(
"create table " + tableName + " (col1 text) partition by column(col2 text) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString("insert overwrite into " + tableName + "(col1) select l_returnflag from lineitem");
} else {
res = executeString("create table " + tableName + " (col1 text) partition by column(col2 text) " +
" as select l_returnflag, null from lineitem");
}
res.close();
res = executeString("select * from " + tableName);
assertResultSet(res);
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col2"},
desc.getStats().getNumRows());
executeString("DROP TABLE " + tableName + " PURGE").close();
}
@Test
public final void testColumnPartitionedTableWithSmallerExpressions6() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions6");
if (nodeType == NodeType.INSERT) {
res = executeString(
"create table " + tableName + " (col1 text) partition by column(col2 text) ");
res.close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString(
"insert overwrite into " + tableName + "(col1) select l_returnflag from lineitem where l_orderkey = 1");
} else {
res = executeString( "create table " + tableName + " (col1 text) partition by column(col2 text) " +
" as select l_returnflag, null from lineitem where l_orderkey = 1");
}
res.close();
res = executeString("select * from " + tableName);
assertResultSet(res);
res.close();
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col2"},
desc.getStats().getNumRows());
executeString("DROP TABLE " + tableName + " PURGE").close();
}
@Test
public void testScatteredHashShuffle() throws Exception {
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.varname, "2");
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME.varname, "1");
try {
KeyValueSet tableOptions = new KeyValueSet();
tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
Schema schema = SchemaBuilder.builder()
.add("col1", TajoDataTypes.Type.TEXT)
.add("col2", TajoDataTypes.Type.TEXT)
.build();
List<String> data = new ArrayList<>();
int totalBytes = 0;
Random rand = new Random(System.currentTimeMillis());
String col2Data = "Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2" +
"Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2" +
"Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2";
int index = 0;
while(true) {
int col1RandomValue = 1;
String str = col1RandomValue + "|col2-" + index + "-" + col2Data;
data.add(str);
totalBytes += str.getBytes().length;
if (totalBytes > 4 * 1024 * 1024) {
break;
}
index++;
}
TajoTestingCluster.createTable("testscatteredhashshuffle", schema, tableOptions, data.toArray(new String[data.size()]), 3);
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable("default", "testscatteredhashshuffle"));
if (nodeType == NodeType.INSERT) {
executeString("create table test_partition (col2 text) partition by column (col1 text)").close();
executeString("insert into test_partition select col2, col1 from testscatteredhashshuffle").close();
} else {
executeString("create table test_partition (col2 text) PARTITION BY COLUMN (col1 text) AS select col2, " +
"col1 from testscatteredhashshuffle").close();
}
ResultSet res = executeString("select col1 from test_partition");
int numRows = 0;
while (res.next()) {
numRows++;
}
assertEquals(data.size(), numRows);
TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, "test_partition");
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, "test_partition", new String[]{"col1"},
desc.getStats().getNumRows());
} finally {
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.varname,
TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.defaultVal);
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME.varname,
TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME.defaultVal);
executeString("DROP TABLE test_partition PURGE").close();
executeString("DROP TABLE testScatteredHashShuffle PURGE").close();
}
}
@Test
public final void TestSpecialCharPartitionKeys1() throws Exception {
// See - TAJO-947: ColPartitionStoreExec can cause URISyntaxException due to special characters.
executeDDL("lineitemspecial_ddl.sql", "lineitemspecial.tbl");
if (nodeType == NodeType.INSERT) {
executeString("CREATE TABLE IF NOT EXISTS pTable947 (id int, name text) PARTITION BY COLUMN (type text)")
.close();
executeString("INSERT OVERWRITE INTO pTable947 SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial")
.close();
} else {
executeString("CREATE TABLE IF NOT EXISTS pTable947 (id int, name text) PARTITION BY COLUMN (type text)" +
" AS SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial")
.close();
}
ResultSet res = executeString("select * from pTable947 where type='RA:*?><I/L#%S' or type='AIR'");
String resStr = resultSetToString(res);
String expected =
"id,name,type\n" +
"-------------------------------\n"
+ "3,NONE,AIR\n"
+ "3,TEST SPECIAL CHARS,RA:*?><I/L#%S\n";
assertEquals(expected, resStr);
cleanupQuery(res);
executeString("DROP TABLE pTable947 PURGE").close();
}
@Test
public final void TestSpecialCharPartitionKeys2() throws Exception {
// See - TAJO-947: ColPartitionStoreExec can cause URISyntaxException due to special characters.
executeDDL("lineitemspecial_ddl.sql", "lineitemspecial.tbl");
if (nodeType == NodeType.INSERT) {
executeString("CREATE TABLE IF NOT EXISTS pTable948 (id int, name text) PARTITION BY COLUMN (type text)")
.close();
executeString("INSERT OVERWRITE INTO pTable948 SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial")
.close();
} else {
executeString("CREATE TABLE IF NOT EXISTS pTable948 (id int, name text) PARTITION BY COLUMN (type text)" +
" AS SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial")
.close();
}
ResultSet res = executeString("select * from pTable948 where type='RA:*?><I/L#%S'");
assertResultSet(res);
cleanupQuery(res);
res = executeString("select * from pTable948 where type='RA:*?><I/L#%S' or type='AIR01'");
assertResultSet(res);
cleanupQuery(res);
executeString("DROP TABLE pTable948 PURGE").close();
}
@Test
public final void testIgnoreFilesInIntermediateDir() throws Exception {
// See - TAJO-1219: Files located in intermediate directories of partitioned table should be ignored
// It verifies that Tajo ignores files located in intermediate directories of partitioned table.
if (nodeType == NodeType.INSERT) {
Path testDir = CommonTestingUtil.getTestDir();
executeString(
"CREATE EXTERNAL TABLE testIgnoreFilesInIntermediateDir (col1 int) USING CSV PARTITION BY COLUMN (col2 text) " +
"LOCATION '" + testDir + "'");
FileSystem fs = testDir.getFileSystem(conf);
FSDataOutputStream fos = fs.create(new Path(testDir, "table1.data"));
fos.write("a|b|c".getBytes());
fos.close();
ResultSet res = executeString("select * from testIgnoreFilesInIntermediateDir;");
assertFalse(res.next());
res.close();
}
}
/**
* Verify added partitions to a table. This would check each partition's directory using record of table.
*
*
* @param databaseName
* @param tableName
* @param partitionColumns
* @param numRows
* @throws Exception
*/
private void verifyPartitionDirectoryFromCatalog(String databaseName, String tableName,
String[] partitionColumns, Long numRows) throws Exception {
int rowCount = 0;
FileSystem fs = FileSystem.get(conf);
Path partitionPath = null;
// Get all partition column values
StringBuilder query = new StringBuilder();
query.append("SELECT");
for (int i = 0; i < partitionColumns.length; i++) {
String partitionColumn = partitionColumns[i];
if (i > 0) {
query.append(",");
}
query.append(" ").append(partitionColumn);
}
query.append(" FROM ").append(databaseName).append(".").append(tableName);
ResultSet res = executeString(query.toString());
StringBuilder partitionName = new StringBuilder();
PartitionDescProto partitionDescProto = null;
// Check whether that partition's directory exist or doesn't exist.
while(res.next()) {
partitionName.delete(0, partitionName.length());
for (int i = 0; i < partitionColumns.length; i++) {
String partitionColumn = partitionColumns[i];
if (i > 0) {
partitionName.append("/");
}
partitionName.append(partitionColumn).append("=").append(res.getString(partitionColumn));
}
partitionDescProto = catalog.getPartition(databaseName, tableName, partitionName.toString());
assertNotNull(partitionDescProto);
assertTrue(partitionDescProto.getPath().indexOf(tableName + "/" + partitionName.toString()) > 0);
partitionPath = new Path(partitionDescProto.getPath());
ContentSummary cs = fs.getContentSummary(partitionPath);
assertEquals(cs.getLength(), partitionDescProto.getNumBytes());
rowCount++;
}
res.close();
// Check row count.
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(numRows, new Long(rowCount));
}
}
@Test
public final void testDuplicatedPartitions() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testDuplicatedPartitions");
try {
executeString("CREATE TABLE lineitem2 as select * from lineitem").close();
// Execute UNION ALL statement for creating multiple output files.
if (nodeType == NodeType.INSERT) {
executeString(
"create table " + tableName + " (col1 int4, col2 int4) partition by column(key text) ").close();
executeString(
"insert overwrite into " + tableName
+ " select a.l_orderkey, a.l_partkey, a.l_returnflag from lineitem a union all"
+ " select b.l_orderkey, b.l_partkey, b.l_returnflag from lineitem2 b"
).close();
} else {
executeString(
"create table " + tableName + "(col1 int4, col2 int4) partition by column(key text) as "
+ " select a.l_orderkey, a.l_partkey, a.l_returnflag from lineitem a union all"
+ " select b.l_orderkey, b.l_partkey, b.l_returnflag from lineitem2 b"
).close();
}
// If duplicated partitions had been removed, partitions just will contain 'KEY=N' partition and 'KEY=R'
// partition. In previous Query and Stage, duplicated partitions were not deleted because they had been in List.
// If you want to verify duplicated partitions, you need to use List instead of Set with DerbyStore.
List<PartitionDescProto> partitions = catalog.getPartitionsOfTable(DEFAULT_DATABASE_NAME, tableName);
assertEquals(2, partitions.size());
PartitionDescProto firstPartition = catalog.getPartition(DEFAULT_DATABASE_NAME, tableName, "key=N");
assertNotNull(firstPartition);
PartitionDescProto secondPartition = catalog.getPartition(DEFAULT_DATABASE_NAME, tableName, "key=R");
assertNotNull(secondPartition);
} finally {
executeString("DROP TABLE lineitem2 PURGE");
executeString("DROP TABLE " + tableName + " PURGE");
}
}
@Test
public final void testPatternMatchingPredicatesAndStringFunctions() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testPatternMatchingPredicatesAndStringFunctions");
String expectedResult;
if (nodeType == NodeType.INSERT) {
executeString("create table " + tableName
+ " (col1 int4, col2 int4) partition by column(l_shipdate text, l_returnflag text) ").close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
assertEquals(4, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
executeString(
"insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_shipdate, l_returnflag from lineitem");
} else {
executeString(
"create table " + tableName + "(col1 int4, col2 int4) partition by column(l_shipdate text, l_returnflag text) "
+ " as select l_orderkey, l_partkey, l_shipdate, l_returnflag from lineitem");
}
assertTrue(client.existTable(tableName));
// Like
res = executeString("SELECT * FROM " + tableName
+ " WHERE l_shipdate LIKE '1996%' and l_returnflag = 'N' order by l_shipdate");
expectedResult = "col1,col2,l_shipdate,l_returnflag\n" +
"-------------------------------\n" +
"1,1,1996-03-13,N\n" +
"1,1,1996-04-12,N\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
// Not like
res = executeString("SELECT * FROM " + tableName
+ " WHERE l_shipdate NOT LIKE '1996%' and l_returnflag IN ('R') order by l_shipdate");
expectedResult = "col1,col2,l_shipdate,l_returnflag\n" +
"-------------------------------\n" +
"3,3,1993-11-09,R\n" +
"3,2,1994-02-02,R\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
// In
res = executeString("SELECT * FROM " + tableName
+ " WHERE l_shipdate IN ('1993-11-09', '1994-02-02', '1997-01-28') AND l_returnflag = 'R' order by l_shipdate");
expectedResult = "col1,col2,l_shipdate,l_returnflag\n" +
"-------------------------------\n" +
"3,3,1993-11-09,R\n" +
"3,2,1994-02-02,R\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
// Similar to
res = executeString("SELECT * FROM " + tableName + " WHERE l_shipdate similar to '1993%' order by l_shipdate");
expectedResult = "col1,col2,l_shipdate,l_returnflag\n" +
"-------------------------------\n" +
"3,3,1993-11-09,R\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
// Regular expression
res = executeString("SELECT * FROM " + tableName
+ " WHERE l_shipdate regexp '[1-2][0-9][0-9][3-9]-[0-1][0-9]-[0-3][0-9]' "
+ " AND l_returnflag <> 'N' ORDER BY l_shipdate");
expectedResult = "col1,col2,l_shipdate,l_returnflag\n" +
"-------------------------------\n" +
"3,3,1993-11-09,R\n" +
"3,2,1994-02-02,R\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
// Concatenate
res = executeString("SELECT * FROM " + tableName
+ " WHERE l_shipdate = ( '1996' || '-' || '03' || '-' || '13' ) order by l_shipdate");
expectedResult = "col1,col2,l_shipdate,l_returnflag\n" +
"-------------------------------\n" +
"1,1,1996-03-13,N\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
executeString("DROP TABLE " + tableName + " PURGE").close();
res.close();
}
@Test
public final void testDatePartitionColumn() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testDatePartitionColumn");
String expectedResult;
if (nodeType == NodeType.INSERT) {
executeString("create table " + tableName + " (col1 int4, col2 int4) partition by column(key date) ").close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
executeString(
"insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_shipdate from lineitem");
} else {
executeString(
"create table " + tableName + "(col1 int4, col2 int4) partition by column(key date) "
+ " as select l_orderkey, l_partkey, l_shipdate::date from lineitem");
}
assertTrue(client.existTable(tableName));
// LessThanOrEquals
res = executeString("SELECT * FROM " + tableName + " WHERE key <= date '1995-09-01' order by col1, col2, key");
expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"3,2,1994-02-02\n" +
"3,3,1993-11-09\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
// LessThan and GreaterThan
res = executeString("SELECT * FROM " + tableName
+ " WHERE key > to_date('1993-01-01', 'YYYY-MM-DD') " +
" and key < to_date('1996-01-01', 'YYYY-MM-DD') order by col1, col2, key desc");
expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"3,2,1994-02-02\n" +
"3,3,1993-11-09\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
// Between
res = executeString("SELECT * FROM " + tableName
+ " WHERE key between date '1993-01-01' and date '1997-01-01' order by col1, col2, key desc");
expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"1,1,1996-04-12\n" +
"1,1,1996-03-13\n" +
"3,2,1994-02-02\n" +
"3,3,1993-11-09\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
// Cast
res = executeString("SELECT * FROM " + tableName
+ " WHERE key > '1993-01-01'::date " +
" and key < '1997-01-01'::timestamp order by col1, col2, key ");
expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"1,1,1996-03-13\n" +
"1,1,1996-04-12\n" +
"3,2,1994-02-02\n" +
"3,3,1993-11-09\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
// Interval
res = executeString("SELECT * FROM " + tableName
+ " WHERE key > '1993-01-01'::date " +
" and key < date '1994-01-01' + interval '1 year' order by col1, col2, key ");
expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"3,2,1994-02-02\n" +
"3,3,1993-11-09\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
// DateTime Function #1
res = executeString("SELECT * FROM " + tableName
+ " WHERE key > '1993-01-01'::date " +
" and key < add_months(date '1994-01-01', 12) order by col1, col2, key ");
assertEquals(expectedResult, resultSetToString(res));
res.close();
// DateTime Function #2
res = executeString("SELECT * FROM " + tableName
+ " WHERE key > '1993-01-01'::date " +
" and key < add_months('1994-01-01'::timestamp, 12) order by col1, col2, key ");
assertEquals(expectedResult, resultSetToString(res));
res.close();
executeString("DROP TABLE " + tableName + " PURGE").close();
res.close();
}
@Test
public final void testTimestampPartitionColumn() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testTimestampPartitionColumn");
String expectedResult;
if (nodeType == NodeType.INSERT) {
executeString("create table " + tableName
+ " (col1 int4, col2 int4) partition by column(key timestamp) ").close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
executeString(
"insert overwrite into " + tableName
+ " select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from lineitem");
} else {
executeString(
"create table " + tableName + "(col1 int4, col2 int4) partition by column(key timestamp) "
+ " as select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from lineitem");
}
assertTrue(client.existTable(tableName));
// LessThanOrEquals
res = executeString("SELECT * FROM " + tableName
+ " WHERE key <= to_timestamp('1995-09-01', 'YYYY-MM-DD') order by col1, col2, key");
expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"3,2,1994-02-02 00:00:00\n" +
"3,3,1993-11-09 00:00:00\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
// LessThan and GreaterThan
res = executeString("SELECT * FROM " + tableName
+ " WHERE key > to_timestamp('1993-01-01', 'YYYY-MM-DD') and " +
"key < to_timestamp('1996-01-01', 'YYYY-MM-DD') order by col1, col2, key desc");
expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"3,2,1994-02-02 00:00:00\n" +
"3,3,1993-11-09 00:00:00\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
// Between
res = executeString("SELECT * FROM " + tableName
+ " WHERE key between to_timestamp('1993-01-01', 'YYYY-MM-DD') " +
"and to_timestamp('1997-01-01', 'YYYY-MM-DD') order by col1, col2, key desc");
expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"1,1,1996-04-12 00:00:00\n" +
"1,1,1996-03-13 00:00:00\n" +
"3,2,1994-02-02 00:00:00\n" +
"3,3,1993-11-09 00:00:00\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
executeString("DROP TABLE " + tableName + " PURGE").close();
res.close();
}
@Test
public final void testTimePartitionColumn() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testTimePartitionColumn");
String expectedResult;
if (nodeType == NodeType.INSERT) {
executeString("create table " + tableName
+ " (col1 int4, col2 int4) partition by column(key time) ").close();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
executeString(
"insert overwrite into " + tableName
+ " select l_orderkey, l_partkey " +
" , CASE l_shipdate WHEN '1996-03-13' THEN cast ('11:20:40' as time) " +
" WHEN '1997-01-28' THEN cast ('12:10:20' as time) " +
" WHEN '1994-02-02' THEN cast ('12:10:30' as time) " +
" ELSE cast ('00:00:00' as time) END " +
" from lineitem");
} else {
executeString(
"create table " + tableName + "(col1 int4, col2 int4) partition by column(key time) "
+ " as select l_orderkey, l_partkey " +
" , CASE l_shipdate WHEN '1996-03-13' THEN cast ('11:20:40' as time) " +
" WHEN '1997-01-28' THEN cast ('12:10:20' as time) " +
" WHEN '1994-02-02' THEN cast ('12:10:30' as time) " +
" ELSE cast ('00:00:00' as time) END " +
" from lineitem");
}
assertTrue(client.existTable(tableName));
// LessThanOrEquals
res = executeString("SELECT * FROM " + tableName
+ " WHERE key <= cast('12:10:20' as time) order by col1, col2, key");
expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"1,1,00:00:00\n" +
"1,1,11:20:40\n" +
"2,2,12:10:20\n" +
"3,3,00:00:00\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
// LessThan and GreaterThan
res = executeString("SELECT * FROM " + tableName
+ " WHERE key > cast('00:00:00' as time) and " +
"key < cast('12:10:00' as time) order by col1, col2, key desc");
expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"1,1,11:20:40\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
// Between
res = executeString("SELECT * FROM " + tableName
+ " WHERE key between cast('11:00:00' as time) " +
"and cast('13:00:00' as time) order by col1, col2, key desc");
expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"1,1,11:20:40\n" +
"2,2,12:10:20\n" +
"3,2,12:10:30\n";
assertEquals(expectedResult, resultSetToString(res));
res.close();
executeString("DROP TABLE " + tableName + " PURGE").close();
res.close();
}
@Test
public final void testDatabaseNameIncludeTableName() throws Exception {
executeString("create database test_partition").close();
String databaseName = "test_partition";
String tableName = CatalogUtil.normalizeIdentifier("part");
if (nodeType == NodeType.INSERT) {
executeString(
"create table " + databaseName + "." + tableName + " (col1 int4, col2 int4) partition by column(key float8) ");
assertTrue(catalog.existsTable(databaseName, tableName));
assertEquals(2, catalog.getTableDesc(databaseName, tableName).getSchema().size());
assertEquals(3, catalog.getTableDesc(databaseName, tableName).getLogicalSchema().size());
executeString(
"insert overwrite into " + databaseName + "." + tableName + " select l_orderkey, l_partkey, " +
"l_quantity from lineitem");
} else {
executeString(
"create table "+ databaseName + "." + tableName + "(col1 int4, col2 int4) partition by column(key float8) "
+ " as select l_orderkey, l_partkey, l_quantity from lineitem");
}
TableDesc tableDesc = catalog.getTableDesc(databaseName, tableName);
verifyPartitionDirectoryFromCatalog(databaseName, tableName, new String[]{"key"},
tableDesc.getStats().getNumRows());
ResultSet res = executeString("select * from " + databaseName + "." + tableName + " ORDER BY key");
String result = resultSetToString(res);
String expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"1,1,17.0\n" +
"1,1,36.0\n" +
"2,2,38.0\n" +
"3,2,45.0\n" +
"3,3,49.0\n";
res.close();
assertEquals(expectedResult, result);
executeString("DROP TABLE " + databaseName + "." + tableName + " PURGE").close();
executeString("DROP database " + databaseName).close();
}
@Test
public void testAbnormalDirectories() throws Exception {
ResultSet res = null;
FileSystem fs = FileSystem.get(conf);
Path path = null;
String tableName = CatalogUtil.normalizeIdentifier("testAbnormalDirectories");
if (nodeType == NodeType.INSERT) {
executeString(
"create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) ").close();
executeString(
"insert overwrite into " + tableName + " select l_orderkey, l_partkey, " +
"l_quantity from lineitem").close();
} else {
executeString(
"create table " + tableName + "(col1 int4, col2 int4) partition by column(key float8) "
+ " as select l_orderkey, l_partkey, l_quantity from lineitem").close();
}
TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"},
tableDesc.getStats().getNumRows());
// When partitions only exist on file system without catalog.
String externalTableName = "testCreateExternalColumnPartitionedTable";
executeString("create external table " + externalTableName + " (col1 int4, col2 int4) " +
" USING TEXT WITH ('text.delimiter'='|') PARTITION BY COLUMN (key float8) " +
" location '" + tableDesc.getUri().getPath() + "'").close();
res = executeString("SELECT COUNT(*) AS cnt FROM " + externalTableName);
String result = resultSetToString(res);
String expectedResult = "cnt\n" +
"-------------------------------\n" +
"5\n";
res.close();
assertEquals(expectedResult, result);
// Make abnormal directories
path = new Path(tableDesc.getUri().getPath(), "key=100.0");
fs.mkdirs(path);
path = new Path(tableDesc.getUri().getPath(), "key=");
fs.mkdirs(path);
path = new Path(tableDesc.getUri().getPath(), "col1=a");
fs.mkdirs(path);
assertEquals(8, fs.listStatus(path.getParent()).length);
res = executeString("SELECT COUNT(*) AS cnt FROM " + externalTableName + " WHERE key > 40.0");
result = resultSetToString(res);
expectedResult = "cnt\n" +
"-------------------------------\n" +
"2\n";
res.close();
assertEquals(expectedResult, result);
// Remove existing partition directory
path = new Path(tableDesc.getUri().getPath(), "key=36.0");
fs.delete(path, true);
res = executeString("SELECT * FROM " + tableName + " ORDER BY key");
result = resultSetToString(res);
expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"1,1,17.0\n" +
"2,2,38.0\n" +
"3,2,45.0\n" +
"3,3,49.0\n";
res.close();
assertEquals(expectedResult, result);
res = executeString("SELECT COUNT(*) AS cnt FROM " + tableName + " WHERE key > 30.0");
result = resultSetToString(res);
expectedResult = "cnt\n" +
"-------------------------------\n" +
"3\n";
res.close();
assertEquals(expectedResult, result);
// Sort
String sortedTableName = "sortedPartitionTable";
executeString("create table " + sortedTableName + " AS SELECT * FROM " + tableName
+ " ORDER BY col1, col2 desc").close();
res = executeString("SELECT * FROM " + sortedTableName + " ORDER BY col1, col2 desc;");
result = resultSetToString(res);
expectedResult = "col1,col2,key\n" +
"-------------------------------\n" +
"1,1,17.0\n" +
"2,2,38.0\n" +
"3,3,49.0\n" +
"3,2,45.0\n";
res.close();
assertEquals(expectedResult, result);
executeString("DROP TABLE " + sortedTableName + " PURGE").close();
executeString("DROP TABLE " + externalTableName).close();
executeString("DROP TABLE " + tableName + " PURGE").close();
}
@Test
public final void testPartitionWithInOperator() throws Exception {
ResultSet res = null;
String tableName = CatalogUtil.normalizeIdentifier("testPartitionWithInOperator");
String result, expectedResult;
if (nodeType == NodeType.INSERT) {
res = testBase.execute(
"create table " + tableName + " (col4 text) partition by column(col1 int4, col2 int4, col3 float8) ");
res.close();
TajoTestingCluster cluster = testBase.getTestingCluster();
CatalogService catalog = cluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
res = executeString("insert overwrite into " + tableName
+ " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
} else {
res = executeString( "create table " + tableName + " (col4 text) "
+ " partition by column(col1 int4, col2 int4, col3 float8) as select l_returnflag, l_orderkey, l_partkey, " +
"l_quantity from lineitem");
}
res.close();
res = executeString("select * from " + tableName);
result = resultSetToString(res);
expectedResult = "col4,col1,col2,col3\n" +
"-------------------------------\n" +
"N,1,1,17.0\n" +
"N,1,1,36.0\n" +
"N,2,2,38.0\n" +
"R,3,2,45.0\n" +
"R,3,3,49.0\n";
res.close();
assertEquals(expectedResult, result);
res = executeString("select * from " + tableName + " WHERE col1 in (1, 2) order by col3");
result = resultSetToString(res);
expectedResult = "col4,col1,col2,col3\n" +
"-------------------------------\n" +
"N,1,1,17.0\n" +
"N,1,1,36.0\n" +
"N,2,2,38.0\n";
res.close();
assertEquals(expectedResult, result);
res = executeString("select * from " + tableName + " WHERE col1 in (2, 3) and col2 = 2 order by col3");
result = resultSetToString(res);
expectedResult = "col4,col1,col2,col3\n" +
"-------------------------------\n" +
"N,2,2,38.0\n" +
"R,3,2,45.0\n";
res.close();
assertEquals(expectedResult, result);
res = executeString("select * from " + tableName + " WHERE col1 in (1, 2, 3) and col2 in (1, 3) and col3 < 30 " +
" order by col3");
result = resultSetToString(res);
expectedResult = "col4,col1,col2,col3\n" +
"-------------------------------\n" +
"N,1,1,17.0\n";
res.close();
assertEquals(expectedResult, result);
executeString("DROP TABLE " + tableName + " PURGE").close();
res.close();
}
}