blob: 1007642a5f5e7a656e4dd29e3ef5410c2401a06d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.client;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.*;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.annotation.NotThreadSafe;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.FunctionDesc;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.UndefinedTablespaceException;
import org.apache.tajo.exception.UndefinedTablespaceHandlerException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.QueryHistoryProto;
import org.apache.tajo.ipc.ClientProtos.QueryInfoProto;
import org.apache.tajo.ipc.ClientProtos.StageHistoryProto;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.CommonTestingUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.powermock.reflect.Whitebox;
import java.io.IOException;
import java.io.InputStream;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import static org.junit.Assert.*;
@Category(IntegrationTest.class)
@NotThreadSafe
public class TestTajoClient {
private static TajoTestingCluster cluster;
private static TajoConf conf;
private static TajoClient client;
private static Path testDir;
@BeforeClass
public static void setUp() throws Exception {
cluster = TpchTestBase.getInstance().getTestingCluster();
conf = cluster.getConfiguration();
client = cluster.newTajoClient();
testDir = CommonTestingUtil.getTestDir();
}
@AfterClass
public static void tearDown() throws Exception {
client.close();
}
private static Path writeTmpTable(String tableName) throws IOException {
Path tablePath = StorageUtil.concatPath(testDir, tableName);
BackendTestingUtil.writeTmpTable(conf, tablePath);
return tablePath;
}
@Test
public final void testCreateAndDropDatabases() throws TajoException {
int currentNum = client.getAllDatabaseNames().size();
String prefix = CatalogUtil.normalizeIdentifier("testCreateDatabase_");
for (int i = 0; i < 10; i++) {
// test allDatabaseNames
assertEquals(currentNum + i, client.getAllDatabaseNames().size());
// test existence
assertFalse(client.existDatabase(prefix + i));
client.createDatabase(prefix + i);
assertTrue(client.existDatabase(prefix + i));
// test allDatabaseNames
assertEquals(currentNum + i + 1, client.getAllDatabaseNames().size());
assertTrue(client.getAllDatabaseNames().contains(prefix + i));
}
// test dropDatabase, existDatabase and getAllDatabaseNames()
for (int i = 0; i < 10; i++) {
assertTrue(client.existDatabase(prefix + i));
assertTrue(client.getAllDatabaseNames().contains(prefix + i));
client.dropDatabase(prefix + i);
assertFalse(client.existDatabase(prefix + i));
assertFalse(client.getAllDatabaseNames().contains(prefix + i));
}
assertEquals(currentNum, client.getAllDatabaseNames().size());
}
@Test
public final void testCurrentDatabase() throws IOException, TajoException, InterruptedException {
int currentNum = client.getAllDatabaseNames().size();
assertEquals(TajoConstants.DEFAULT_DATABASE_NAME, client.getCurrentDatabase());
String databaseName = CatalogUtil.normalizeIdentifier("testcurrentdatabase");
client.createDatabase(databaseName);
assertEquals(currentNum + 1, client.getAllDatabaseNames().size());
assertEquals(TajoConstants.DEFAULT_DATABASE_NAME, client.getCurrentDatabase());
client.selectDatabase(databaseName);
assertEquals(databaseName, client.getCurrentDatabase());
client.selectDatabase(TajoConstants.DEFAULT_DATABASE_NAME);
client.dropDatabase(databaseName);
assertEquals(currentNum, client.getAllDatabaseNames().size());
}
@Test
public final void testSelectDatabaseToInvalidOne() throws IOException, TajoException, InterruptedException {
int currentNum = client.getAllDatabaseNames().size();
assertFalse(client.existDatabase("invaliddatabase"));
try {
client.selectDatabase("invaliddatabase");
assertFalse(true);
} catch (Throwable t) {
assertFalse(false);
}
assertEquals(currentNum, client.getAllDatabaseNames().size());
}
@Test
public final void testDropCurrentDatabase() throws IOException, TajoException, InterruptedException {
int currentNum = client.getAllDatabaseNames().size();
String databaseName = CatalogUtil.normalizeIdentifier("testdropcurrentdatabase");
client.createDatabase(databaseName);
client.selectDatabase(databaseName);
assertEquals(databaseName, client.getCurrentDatabase());
try {
client.dropDatabase(databaseName);
assertFalse(true);
} catch (Throwable t) {
assertFalse(false);
}
client.selectDatabase(TajoConstants.DEFAULT_DATABASE_NAME);
client.dropDatabase(databaseName);
assertEquals(currentNum, client.getAllDatabaseNames().size());
}
@Test
public final void testSessionVariables() throws IOException, TajoException, InterruptedException {
String prefixName = "key_";
String prefixValue = "val_";
List<String> unsetList = new ArrayList<>();
for(Map.Entry<String, String> entry: client.getAllSessionVariables().entrySet()) {
unsetList.add(entry.getKey());
}
client.unsetSessionVariables(unsetList);
for (int i = 0; i < 10; i++) {
String key = prefixName + i;
String val = prefixValue + i;
// Basically,
assertEquals(i + 4, client.getAllSessionVariables().size());
assertFalse(client.getAllSessionVariables().containsKey(key));
assertFalse(client.existSessionVariable(key));
Map<String, String> map = Maps.newHashMap();
map.put(key, val);
client.updateSessionVariables(map);
assertEquals(i + 5, client.getAllSessionVariables().size());
assertTrue(client.getAllSessionVariables().containsKey(key));
assertTrue(client.existSessionVariable(key));
}
int totalSessionVarNum = client.getAllSessionVariables().size();
for (int i = 0; i < 10; i++) {
String key = prefixName + i;
assertTrue(client.getAllSessionVariables().containsKey(key));
assertTrue(client.existSessionVariable(key));
client.unsetSessionVariables(Lists.newArrayList(key));
assertFalse(client.getAllSessionVariables().containsKey(key));
assertFalse(client.existSessionVariable(key));
}
assertEquals(totalSessionVarNum - 10, client.getAllSessionVariables().size());
}
@Test
public final void testKillQuery() throws IOException, TajoException, InterruptedException {
ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem where l_orderkey > 0");
Thread.sleep(1000);
QueryId queryId = new QueryId(res.getQueryId());
client.killQuery(queryId);
assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId).getState());
}
@Test
public final void testUpdateQuery() throws IOException, TajoException {
final String tableName = CatalogUtil.normalizeIdentifier("testUpdateQuery");
Path tablePath = writeTmpTable(tableName);
assertFalse(client.existTable(tableName));
String sql =
"create external table " + tableName + " (deptname text, score integer) "
+ "using csv location '" + tablePath + "'";
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
client.dropTable(tableName);
assertFalse(client.existTable(tableName));
}
@Test
public final void testCreateAndDropExternalTable() throws IOException, TajoException {
final String tableName = "testCreateAndDropExternalTable";
Path tablePath = writeTmpTable(tableName);
LOG.error("Full path:" + tablePath.toUri().getRawPath());
FileSystem fs = tablePath.getFileSystem(conf);
assertTrue(fs.exists(tablePath));
assertFalse(client.existTable(tableName));
client.createExternalTable(tableName, BackendTestingUtil.mockupSchema, tablePath.toUri(), BackendTestingUtil.mockupMeta);
assertTrue(client.existTable(tableName));
client.dropTable(tableName);
assertFalse(client.existTable(tableName));
fs = tablePath.getFileSystem(conf);
assertTrue(fs.exists(tablePath));
}
@Test
public final void testCreateAndPurgeExternalTable() throws IOException, TajoException {
final String tableName = "testCreateAndPurgeExternalTable";
Path tablePath = writeTmpTable(tableName);
LOG.error("Full path:" + tablePath.toUri().getRawPath());
FileSystem fs = tablePath.getFileSystem(conf);
assertTrue(fs.exists(tablePath));
assertFalse(client.existTable(tableName));
client.createExternalTable(tableName, BackendTestingUtil.mockupSchema, tablePath.toUri(), BackendTestingUtil.mockupMeta);
assertTrue(client.existTable(tableName));
client.dropTable(tableName, true);
assertFalse(client.existTable(tableName));
fs = tablePath.getFileSystem(conf);
assertFalse("Checking if table data exists", fs.exists(tablePath));
}
@Test
public final void testCreateAndDropExternalTableByExecuteQuery() throws IOException, TajoException {
TajoConf conf = cluster.getConfiguration();
final String tableName = CatalogUtil.normalizeIdentifier("testCreateAndDropExternalTableByExecuteQuery");
Path tablePath = writeTmpTable(tableName);
assertFalse(client.existTable(tableName));
String sql = "create external table " + tableName + " (deptname text, score int4) " + "using csv location '"
+ tablePath + "'";
client.executeQueryAndGetResult(sql);
assertTrue(client.existTable(tableName));
client.updateQuery("drop table " + tableName);
assertFalse(client.existTable("tableName"));
FileSystem localFS = FileSystem.getLocal(conf);
assertTrue(localFS.exists(tablePath));
}
@Test
public final void testCreateAndPurgeExternalTableByExecuteQuery() throws IOException, TajoException {
TajoConf conf = cluster.getConfiguration();
final String tableName = CatalogUtil.normalizeIdentifier("testCreateAndPurgeExternalTableByExecuteQuery");
Path tablePath = writeTmpTable(tableName);
assertFalse(client.existTable(tableName));
String sql = "create external table " + tableName + " (deptname text, score int4) " + "using csv location '"
+ tablePath + "'";
client.executeQueryAndGetResult(sql);
assertTrue(client.existTable(tableName));
client.updateQuery("drop table " + tableName + " purge");
assertFalse(client.existTable(tableName));
FileSystem localFS = FileSystem.getLocal(conf);
assertFalse(localFS.exists(tablePath));
}
@Test
public final void testCreateAndDropTableByExecuteQuery() throws IOException, TajoException {
TajoConf conf = cluster.getConfiguration();
final String tableName = CatalogUtil.normalizeIdentifier("testCreateAndDropTableByExecuteQuery");
assertFalse(client.existTable(tableName));
String sql = "create table " + tableName + " (deptname text, score int4)";
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
client.updateQuery("drop table " + tableName);
assertFalse(client.existTable(tableName));
assertTrue(hdfs.exists(tablePath));
}
@Test
public final void testCreateAndPurgeTableByExecuteQuery() throws IOException, TajoException {
TajoConf conf = cluster.getConfiguration();
final String tableName = CatalogUtil.normalizeIdentifier("testCreateAndPurgeTableByExecuteQuery");
assertFalse(client.existTable(tableName));
String sql = "create table " + tableName + " (deptname text, score int4)";
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
client.updateQuery("drop table " + tableName + " purge");
assertFalse(client.existTable(tableName));
assertFalse(hdfs.exists(tablePath));
}
@Test
public final void testDDLByExecuteQuery() throws IOException, TajoException {
final String tableName = CatalogUtil.normalizeIdentifier("testDDLByExecuteQuery");
Path tablePath = writeTmpTable(tableName);
assertFalse(client.existTable(tableName));
String sql =
"create external table " + tableName + " (deptname text, score int4) "
+ "using csv location '" + tablePath + "'";
client.executeQueryAndGetResult(sql);
assertTrue(client.existTable(tableName));
}
@Test
public final void testGetTableList() throws IOException, TajoException {
String tableName1 = "GetTableList1".toLowerCase();
String tableName2 = "GetTableList2".toLowerCase();
assertFalse(client.existTable(tableName1));
assertFalse(client.existTable(tableName2));
client.updateQuery("create table GetTableList1 (age int, name text);");
client.updateQuery("create table GetTableList2 (age int, name text);");
assertTrue(client.existTable(tableName1));
assertTrue(client.existTable(tableName2));
Set<String> tables = Sets.newHashSet(client.getTableList(null));
assertTrue(tables.contains(tableName1));
assertTrue(tables.contains(tableName2));
}
Log LOG = LogFactory.getLog(TestTajoClient.class);
@Test
public final void testGetTableDesc() throws IOException, TajoException {
final String tableName1 = CatalogUtil.normalizeIdentifier("table3");
Path tablePath = writeTmpTable(tableName1);
LOG.error("Full path:" + tablePath.toUri().getRawPath());
FileSystem fs = tablePath.getFileSystem(conf);
assertTrue(fs.exists(tablePath));
assertNotNull(tablePath);
assertFalse(client.existTable(tableName1));
client.createExternalTable("table3", BackendTestingUtil.mockupSchema, tablePath.toUri(), BackendTestingUtil.mockupMeta);
assertTrue(client.existTable(tableName1));
TableDesc desc = client.getTableDesc(tableName1);
assertNotNull(desc);
assertEquals(CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, tableName1), desc.getName());
assertTrue(desc.getStats().getNumBytes() > 0);
}
//@Test
public final void testCreateAndDropTablePartitionedHash1ByExecuteQuery() throws IOException, TajoException {
TajoConf conf = cluster.getConfiguration();
final String tableName = "testCreateAndDropTablePartitionedHash1ByExecuteQuery";
assertFalse(client.existTable(tableName));
String sql = "create table " + tableName + " (deptname text, score int4)";
sql += " PARTITION BY HASH (deptname)";
sql += " (PARTITION sub_part1, PARTITION sub_part2, PARTITION sub_part3)";
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
client.updateQuery("drop table " + tableName);
assertFalse(client.existTable(tableName));
assertTrue(hdfs.exists(tablePath));
}
//@Test
public final void testCreateAndPurgeTablePartitionedHash1ByExecuteQuery() throws IOException, TajoException {
TajoConf conf = cluster.getConfiguration();
final String tableName = "testCreateAndPurgeTablePartitionedHash1ByExecuteQuery";
assertFalse(client.existTable(tableName));
String sql = "create table " + tableName + " (deptname text, score int4)";
sql += " PARTITION BY HASH (deptname)";
sql += " (PARTITION sub_part1, PARTITION sub_part2, PARTITION sub_part3)";
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
client.updateQuery("drop table " + tableName + " purge");
assertFalse(client.existTable(tableName));
assertFalse(hdfs.exists(tablePath));
}
//@Test
public final void testCreateAndDropTablePartitionedHash2ByExecuteQuery() throws IOException, TajoException {
TajoConf conf = cluster.getConfiguration();
final String tableName = "testCreateAndDropTablePartitionedHash2ByExecuteQuery";
assertFalse(client.existTable(tableName));
String sql = "create table " + tableName + " (deptname text, score int4)";
sql += "PARTITION BY HASH (deptname)";
sql += "PARTITIONS 2";
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
client.updateQuery("drop table " + tableName + " purge");
assertFalse(client.existTable(tableName));
assertFalse(hdfs.exists(tablePath));
}
//@Test
public final void testCreateAndDropTablePartitionedListByExecuteQuery() throws IOException, TajoException {
TajoConf conf = cluster.getConfiguration();
final String tableName = "testCreateAndDropTablePartitionedListByExecuteQuery";
assertFalse(client.existTable(tableName));
String sql = "create table " + tableName + " (deptname text, score int4)";
sql += "PARTITION BY LIST (deptname)";
sql += "( PARTITION sub_part1 VALUES('r&d', 'design'),";
sql += "PARTITION sub_part2 VALUES('sales', 'hr') )";
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
client.updateQuery("drop table " + tableName + " purge");
assertFalse(client.existTable(tableName));
assertFalse(hdfs.exists(tablePath));
}
//@Test
public final void testCreateAndDropTablePartitionedRangeByExecuteQuery() throws IOException, TajoException {
TajoConf conf = cluster.getConfiguration();
final String tableName = "testCreateAndDropTablePartitionedRangeByExecuteQuery";
assertFalse(client.existTable(tableName));
String sql = "create table " + tableName + " (deptname text, score int4)";
sql += "PARTITION BY RANGE (score)";
sql += "( PARTITION sub_part1 VALUES LESS THAN (2),";
sql += "PARTITION sub_part2 VALUES LESS THAN (5),";
sql += "PARTITION sub_part2 VALUES LESS THAN (MAXVALUE) )";
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
client.updateQuery("drop table " + tableName + " purge");
assertFalse(client.existTable(tableName));
assertFalse(hdfs.exists(tablePath));
}
@Test
public final void testFailCreateTablePartitionedOtherExceptColumn() throws IOException, TajoException {
final String tableName = "testFailCreateTablePartitionedOtherExceptColumn";
assertFalse(client.existTable(tableName));
String rangeSql = "create table " + tableName + " (deptname text, score int4)";
rangeSql += "PARTITION BY RANGE (score)";
rangeSql += "( PARTITION sub_part1 VALUES LESS THAN (2),";
rangeSql += "PARTITION sub_part2 VALUES LESS THAN (5),";
rangeSql += "PARTITION sub_part2 VALUES LESS THAN (MAXVALUE) )";
try {
client.updateQuery(rangeSql);
fail();
} catch (UnsupportedException se) {
}
String listSql = "create table " + tableName + " (deptname text, score int4)";
listSql += "PARTITION BY LIST (deptname)";
listSql += "( PARTITION sub_part1 VALUES('r&d', 'design'),";
listSql += "PARTITION sub_part2 VALUES('sales', 'hr') )";
try {
assertFalse(client.updateQuery(listSql));
fail();
} catch (UnsupportedException se) {
}
String hashSql = "create table " + tableName + " (deptname text, score int4)";
hashSql += "PARTITION BY HASH (deptname)";
hashSql += "PARTITIONS 2";
try {
assertFalse(client.updateQuery(hashSql));
fail();
} catch (UnsupportedException se) {
}
}
@Test
public final void testCreateAndDropTablePartitionedColumnByExecuteQuery() throws IOException, TajoException {
TajoConf conf = cluster.getConfiguration();
final String tableName = CatalogUtil.normalizeIdentifier("testCreateAndDropTablePartitionedColumnByExecuteQuery");
assertFalse(client.existTable(tableName));
String sql = "create table " + tableName + " (deptname text, score int4)";
sql += "PARTITION BY COLUMN (key1 text)";
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
client.updateQuery("drop table " + tableName + " purge");
assertFalse(client.existTable(tableName));
assertFalse(hdfs.exists(tablePath));
}
@Test
public final void testGetFunctions() throws IOException, TajoException {
Collection<FunctionDesc> catalogFunctions = cluster.getMaster().getCatalog().getFunctions();
String functionName = "sum";
int numFunctions = 0;
for(FunctionDesc eachFunction: catalogFunctions) {
if(functionName.equals(eachFunction.getFunctionName())) {
numFunctions++;
}
}
List<CatalogProtos.FunctionDescProto> functions = client.getFunctions(functionName);
assertEquals(numFunctions, functions.size());
functions = client.getFunctions("notmatched");
assertEquals(0, functions.size());
functions = client.getFunctions(null);
assertEquals(catalogFunctions.size(), functions.size());
}
@Test
public final void testGetFinishedQueryList() throws SQLException, TajoException {
final String tableName = CatalogUtil.normalizeIdentifier("testGetFinishedQueryList");
String sql = "create table " + tableName + " (deptname text, score int4)";
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
int numFinishedQueries = client.getFinishedQueryList().size();
ResultSet resultSet = client.executeQueryAndGetResult("select * from " + tableName + " order by deptname");
assertNotNull(resultSet);
resultSet = client.executeQueryAndGetResult("select * from " + tableName + " order by deptname");
assertNotNull(resultSet);
assertEquals(numFinishedQueries + 2, client.getFinishedQueryList().size());
resultSet.close();
}
/**
* The main objective of this test is to get the status of a query which is actually finished.
* Statuses of queries regardless of its status should be available for a specified time duration.
*/
@Test(timeout = 20 * 1000)
public final void testGetQueryStatusAndResultAfterFinish() throws Exception {
String sql = "select * from lineitem order by l_orderkey";
ClientProtos.SubmitQueryResponse response = client.executeQuery(sql);
assertNotNull(response);
QueryId queryId = new QueryId(response.getQueryId());
try {
while (true) {
Thread.sleep(100);
List<ClientProtos.BriefQueryInfo> finishedQueries = client.getFinishedQueryList();
boolean finished = false;
if (finishedQueries != null) {
for (ClientProtos.BriefQueryInfo eachQuery: finishedQueries) {
if (eachQuery.getQueryId().equals(queryId.getProto())) {
finished = true;
break;
}
}
}
if (finished) {
break;
}
}
QueryStatus queryStatus = client.getQueryStatus(queryId);
assertNotNull(queryStatus);
assertTrue(TajoClientUtil.isQueryComplete(queryStatus.getState()));
ResultSet resultSet = client.getQueryResult(queryId);
assertNotNull(resultSet);
int count = 0;
while(resultSet.next()) {
count++;
}
assertEquals(8, count);
} finally {
client.closeQuery(queryId);
}
}
@Test
public void testNullCharSessionInCTAS() throws Exception {
String sql =
"create table nullcharsession as select\n" +
" c_custkey,\n" +
" orders.o_orderkey,\n" +
" orders.o_orderstatus \n" +
"from\n" +
" orders full outer join customer on c_custkey = o_orderkey\n" +
"order by\n" +
" c_custkey,\n" +
" orders.o_orderkey;\n";
Map<String, String> variables = new HashMap<>();
variables.put(SessionVars.NULL_CHAR.keyname(), "\\\\T");
client.updateSessionVariables(variables);
ResultSet res = client.executeQueryAndGetResult(sql);
res.close();
TableDesc resultDesc = client.getTableDesc("nullcharsession");
assertNullCharSessionVar(resultDesc);
}
public void assertNullCharSessionVar(TableDesc resultDesc) throws Exception {
TajoConf tajoConf = TpchTestBase.getInstance().getTestingCluster().getConfiguration();
assertEquals(resultDesc.getMeta().getProperty(StorageConstants.TEXT_NULL), "\\\\T");
Path path = new Path(resultDesc.getUri());
FileSystem fs = path.getFileSystem(tajoConf);
FileStatus[] files = fs.listStatus(path);
assertNotNull(files);
assertEquals(1, files.length);
InputStream in = fs.open(files[0].getPath());
byte[] buf = new byte[1024];
int readBytes = in.read(buf);
assertTrue(readBytes > 0);
// text type field's value is replaced with \T
String expected = "1|1|O\n" +
"2|2|O\n" +
"3|3|F\n" +
"4||\\T\n" +
"5||\\T\n" +
"||\\T\n" +
"||\\T\n" +
"||\\T\n" +
"||\\T\n" +
"||\\T\n" +
"||\\T\n";
String resultDatas = new String(buf, 0, readBytes);
assertEquals(expected, resultDatas);
}
@Test(timeout = 30000)
public void testGetQueryInfoAndHistory() throws Exception {
String sql = "select count(*) from lineitem";
ClientProtos.SubmitQueryResponse response = client.executeQuery(sql);
assertNotNull(response);
QueryId queryId = new QueryId(response.getQueryId());
QueryInfoProto queryInfo;
while (true) {
queryInfo = client.getQueryInfo(queryId);
if (queryInfo != null && queryInfo.getQueryState() == QueryState.QUERY_SUCCEEDED) {
break;
}
Thread.sleep(100);
}
assertNotNull(queryInfo);
assertEquals(queryId.toString(), queryInfo.getQueryId());
QueryHistoryProto queryHistory = client.getQueryHistory(queryId);
assertNotNull(queryHistory);
assertEquals(queryId.toString(), queryHistory.getQueryId());
assertEquals(2, queryHistory.getStageHistoriesCount());
List<ClientProtos.StageHistoryProto> taskHistories =
new ArrayList<>(queryHistory.getStageHistoriesList());
Collections.sort(taskHistories, new Comparator<StageHistoryProto>() {
@Override
public int compare(ClientProtos.StageHistoryProto o1, StageHistoryProto o2) {
return o1.getExecutionBlockId().compareTo(o2.getExecutionBlockId());
}
});
assertEquals(8, taskHistories.get(0).getTotalReadRows());
assertEquals(1, taskHistories.get(0).getTotalWriteRows());
assertEquals(1, taskHistories.get(1).getTotalReadRows());
assertEquals(1, taskHistories.get(1).getTotalWriteRows());
}
@Test
public void testClientRPCInterference() throws Exception {
TajoClient client = cluster.newTajoClient();
TajoClient client2 = cluster.newTajoClient();
NettyClientBase rpcClient = Whitebox.getInternalState(client, NettyClientBase.class);
assertNotNull(rpcClient);
NettyClientBase rpcClient2 = Whitebox.getInternalState(client2, NettyClientBase.class);
assertNotNull(rpcClient);
assertNotEquals(rpcClient.getChannel().eventLoop(), rpcClient2.getChannel().eventLoop());
client.close();
client2.close();
rpcClient.getChannel().eventLoop().terminationFuture().sync();
assertTrue(rpcClient.getChannel().eventLoop().isTerminated());
rpcClient2.getChannel().eventLoop().terminationFuture().sync();
assertTrue(rpcClient2.getChannel().eventLoop().isTerminated());
}
@Test(expected = UndefinedTablespaceException.class)
public void testCreateTableOnAbsentTablespace() throws TajoException {
client.updateQuery("CREATE TABLE testCreateTableOnAbsentTablespace (AGE INT) TABLESPACE unknown123");
}
@Test(expected = UndefinedTablespaceHandlerException.class)
public void testCreateTableWithAbsentTablespaceHandler() throws TajoException {
client.updateQuery(
"CREATE EXTERNAL TABLE testCreateTableWithAbsentTablespaceHandler (AGE INT) USING TEXT LOCATION 'hdfx://tajo'");
}
}