blob: 6f2a5460e1cc80e98e112515e17349a5b79e7eb5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.query;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.tajo.IntegrationTest;
import org.apache.tajo.QueryTestCaseBase;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.schema.IdentifierUtil;
import org.apache.tajo.util.CommonTestingUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.sql.ResultSet;
import java.util.List;
import static org.junit.Assert.*;
@Category(IntegrationTest.class)
public class TestInsertQuery extends QueryTestCaseBase {
@Test
public final void testInsertOverwrite() throws Exception {
ResultSet res = executeFile("table1_ddl.sql");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
res = executeFile("testInsertOverwrite.sql");
res.close();
TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(8, desc.getStats().getNumRows().intValue());
}
executeString("DROP TABLE table1 PURGE");
}
@Test
public final void testInsertInto() throws Exception {
// create table and upload test data
ResultSet res = executeFile("table1_ddl.sql");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
res = executeFile("testInsertOverwrite.sql");
res.close();
TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(8, desc.getStats().getNumRows().intValue());
}
res = executeFile("testInsertInto.sql");
res.close();
List<Path> dataFiles = listTableFiles("table1");
assertEquals(2, dataFiles.size());
for (int i = 0; i < dataFiles.size(); i++) {
String name = dataFiles.get(i).getName();
assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
String[] tokens = name.split("-");
assertEquals(4, tokens.length);
assertEquals(i, Integer.parseInt(tokens[3]));
}
String tableDatas = getTableFileContents("table1");
String expected = "1|1|17.0\n" +
"1|1|36.0\n" +
"2|2|38.0\n" +
"3|2|45.0\n" +
"3|3|49.0\n" +
"||\n" +
"||\n" +
"||\n" +
"1|1|17.0\n" +
"1|1|36.0\n" +
"2|2|38.0\n" +
"3|2|45.0\n" +
"3|3|49.0\n" +
"||\n" +
"||\n" +
"||\n";
assertNotNull(tableDatas);
assertEquals(expected, tableDatas);
executeString("DROP TABLE table1 PURGE");
}
@Test
public final void testInsertIntoLocation() throws Exception {
Path dfsPath = new Path("/tajo-data/testInsertIntoLocation");
assertTestInsertIntoLocation(dfsPath);
}
@Test
public final void testInsertIntoLocationDifferentFSs() throws Exception {
Path localPath = CommonTestingUtil.getTestDir();
assertTestInsertIntoLocation(localPath);
}
public final void assertTestInsertIntoLocation(Path path) throws Exception {
FileSystem fs = null;
try {
executeString("insert into location '" + path + "' select l_orderkey, l_partkey, l_linenumber from default.lineitem").close();
String resultFileData = getTableFileContents(path);
String expected = "1|1|1\n" +
"1|1|2\n" +
"2|2|1\n" +
"3|2|1\n" +
"3|3|2\n" +
"||\n" +
"||\n" +
"||\n";
assertEquals(expected, resultFileData);
fs = path.getFileSystem(testingCluster.getConfiguration());
FileStatus[] files = fs.listStatus(path);
assertNotNull(files);
assertEquals(1, files.length);
for (FileStatus eachFileStatus : files) {
String name = eachFileStatus.getPath().getName();
assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
}
executeString("insert into location '" + path + "' select l_orderkey, l_partkey, l_linenumber from default.lineitem").close();
resultFileData = getTableFileContents(path);
expected = "1|1|1\n" +
"1|1|2\n" +
"2|2|1\n" +
"3|2|1\n" +
"3|3|2\n" +
"||\n" +
"||\n" +
"||\n";
assertEquals(expected + expected, resultFileData);
files = fs.listStatus(path);
assertNotNull(files);
assertEquals(2, files.length);
for (FileStatus eachFileStatus : files) {
String name = eachFileStatus.getPath().getName();
assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
}
} finally {
if (fs != null) {
fs.delete(path, true);
}
}
}
@Test
public final void testInsertIntoPartitionedTable() throws Exception {
String tableName = IdentifierUtil.normalizeIdentifier("testInsertIntoPartitionedTable");
executeString("create table " + tableName + " (n_name TEXT, n_regionkey INT4)" +
"USING csv PARTITION by column(n_nationkey INT4)" ).close();
try {
executeString("insert into " + tableName + " select n_name, n_regionkey, n_nationkey from default.nation").close();
ResultSet res = executeString("select * from " + tableName);
String expected = "n_name,n_regionkey,n_nationkey\n" +
"-------------------------------\n" +
"ALGERIA,0,0\n" +
"ARGENTINA,1,1\n" +
"IRAN,4,10\n" +
"IRAQ,4,11\n" +
"JAPAN,2,12\n" +
"JORDAN,4,13\n" +
"KENYA,0,14\n" +
"MOROCCO,0,15\n" +
"MOZAMBIQUE,0,16\n" +
"PERU,1,17\n" +
"CHINA,2,18\n" +
"ROMANIA,3,19\n" +
"BRAZIL,1,2\n" +
"SAUDI ARABIA,4,20\n" +
"VIETNAM,2,21\n" +
"RUSSIA,3,22\n" +
"UNITED KINGDOM,3,23\n" +
"UNITED STATES,1,24\n" +
"CANADA,1,3\n" +
"EGYPT,4,4\n" +
"ETHIOPIA,0,5\n" +
"FRANCE,3,6\n" +
"GERMANY,3,7\n" +
"INDIA,2,8\n" +
"INDONESIA,2,9\n" +
"null,null,null\n" +
"null,null,null\n" +
"null,null,null\n";
assertEquals(expected, resultSetToString(res));
res.close();
executeString("insert into " + tableName + " select n_name, n_regionkey, n_nationkey from default.nation").close();
res = executeString("select * from " + tableName);
expected = "n_name,n_regionkey,n_nationkey\n" +
"-------------------------------\n" +
"ALGERIA,0,0\n" +
"ALGERIA,0,0\n" +
"ARGENTINA,1,1\n" +
"ARGENTINA,1,1\n" +
"IRAN,4,10\n" +
"IRAN,4,10\n" +
"IRAQ,4,11\n" +
"IRAQ,4,11\n" +
"JAPAN,2,12\n" +
"JAPAN,2,12\n" +
"JORDAN,4,13\n" +
"JORDAN,4,13\n" +
"KENYA,0,14\n" +
"KENYA,0,14\n" +
"MOROCCO,0,15\n" +
"MOROCCO,0,15\n" +
"MOZAMBIQUE,0,16\n" +
"MOZAMBIQUE,0,16\n" +
"PERU,1,17\n" +
"PERU,1,17\n" +
"CHINA,2,18\n" +
"CHINA,2,18\n" +
"ROMANIA,3,19\n" +
"ROMANIA,3,19\n" +
"BRAZIL,1,2\n" +
"BRAZIL,1,2\n" +
"SAUDI ARABIA,4,20\n" +
"SAUDI ARABIA,4,20\n" +
"VIETNAM,2,21\n" +
"VIETNAM,2,21\n" +
"RUSSIA,3,22\n" +
"RUSSIA,3,22\n" +
"UNITED KINGDOM,3,23\n" +
"UNITED KINGDOM,3,23\n" +
"UNITED STATES,1,24\n" +
"UNITED STATES,1,24\n" +
"CANADA,1,3\n" +
"CANADA,1,3\n" +
"EGYPT,4,4\n" +
"EGYPT,4,4\n" +
"ETHIOPIA,0,5\n" +
"ETHIOPIA,0,5\n" +
"FRANCE,3,6\n" +
"FRANCE,3,6\n" +
"GERMANY,3,7\n" +
"GERMANY,3,7\n" +
"INDIA,2,8\n" +
"INDIA,2,8\n" +
"INDONESIA,2,9\n" +
"INDONESIA,2,9\n" +
"null,null,null\n" +
"null,null,null\n" +
"null,null,null\n" +
"null,null,null\n" +
"null,null,null\n" +
"null,null,null\n";
assertEquals(expected, resultSetToString(res));
TableDesc tableDesc = testingCluster.getMaster().getCatalog().getTableDesc(getCurrentDatabase(), tableName);
assertNotNull(tableDesc);
Path path = new Path(tableDesc.getUri());
FileSystem fs = path.getFileSystem(testingCluster.getConfiguration());
FileStatus[] files = fs.listStatus(path);
assertNotNull(files);
assertEquals(26, files.length);
for (FileStatus eachFileStatus: files) {
assertTrue(eachFileStatus.getPath().getName().indexOf("n_nationkey=") == 0);
FileStatus[] dataFiles = fs.listStatus(eachFileStatus.getPath());
assertEquals(2, dataFiles.length);
for (FileStatus eachDataFileStatus: dataFiles) {
String name = eachDataFileStatus.getPath().getName();
assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
}
}
} finally {
executeString("DROP TABLE " + tableName + " PURGE");
}
}
@Test
public final void testInsertOverwriteSmallerColumns() throws Exception {
ResultSet res = executeFile("table1_ddl.sql");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
TableDesc originalDesc = catalog.getTableDesc(getCurrentDatabase(), "table1");
res = executeFile("testInsertOverwriteSmallerColumns.sql");
res.close();
TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(8, desc.getStats().getNumRows().intValue());
}
assertEquals(originalDesc.getSchema(), desc.getSchema());
executeString("DROP TABLE table1 PURGE");
}
@Test
public final void testInsertOverwriteWithTargetColumns() throws Exception {
ResultSet res = executeFile("table1_ddl.sql");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
TableDesc originalDesc = catalog.getTableDesc(getCurrentDatabase(), "table1");
res = executeFile("testInsertOverwriteWithTargetColumns.sql");
res.close();
TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(8, desc.getStats().getNumRows().intValue());
}
res = executeString("select * from " + IdentifierUtil.denormalizeIdentifier(getCurrentDatabase()) + ".table1");
assertTrue(res.next());
assertEquals(1, res.getLong(1));
assertTrue(0f == res.getFloat(2));
assertTrue(res.wasNull());
assertTrue(17.0 == res.getFloat(3));
assertTrue(res.next());
assertEquals(1, res.getLong(1));
assertTrue(0f == res.getFloat(2));
assertTrue(res.wasNull());
assertTrue(36.0 == res.getFloat(3));
assertTrue(res.next());
assertEquals(2, res.getLong(1));
assertTrue(0f == res.getFloat(2));
assertTrue(res.wasNull());
assertTrue(38.0 == res.getFloat(3));
assertTrue(res.next());
assertTrue(0f == res.getFloat(2));
assertTrue(res.wasNull());
assertTrue(45.0 == res.getFloat(3));
assertTrue(res.next());
assertEquals(3, res.getLong(1));
assertTrue(0f == res.getFloat(2));
assertTrue(res.wasNull());
assertTrue(49.0 == res.getFloat(3));
assertTrue(res.next());
assertEquals(0, res.getLong(1));
assertEquals(0.0, 0.0, res.getFloat(2));
assertEquals(0.0, 0.0, res.getFloat(3));
assertTrue(res.next());
assertEquals(0, res.getLong(1));
assertEquals(0.0, 0.0, res.getFloat(2));
assertEquals(0.0, 0.0, res.getFloat(3));
assertTrue(res.next());
assertEquals(0, res.getLong(1));
assertEquals(0.0, 0.0, res.getFloat(2));
assertEquals(0.0, 0.0, res.getFloat(3));
assertFalse(res.next());
res.close();
assertEquals(originalDesc.getSchema(), desc.getSchema());
executeString("DROP TABLE table1 PURGE");
}
@Test
public final void testInsertOverwriteWithAsterisk() throws Exception {
ResultSet res = executeFile("full_table_csv_ddl.sql");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), "full_table_csv"));
res = executeString("insert overwrite into full_table_csv select * from default.lineitem where l_orderkey = 3");
res.close();
TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "full_table_csv");
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(2, desc.getStats().getNumRows().intValue());
}
executeString("DROP TABLE full_table_csv PURGE");
}
@Test
public final void testInsertOverwriteWithAsteriskAndMore() throws Exception {
ResultSet res = executeFile("lineitem_year_month_ddl.sql");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), "lineitem_year_month"));
res = executeFile("load_to_lineitem_year_month.sql");
res.close();
TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "lineitem_year_month");
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(8, desc.getStats().getNumRows().intValue());
}
res = executeQuery();
assertResultSet(res);
res.close();
executeString("DROP TABLE lineitem_year_month PURGE");
}
@Test
public final void testInsertOverwriteIntoSelect() throws Exception {
String tableName = IdentifierUtil.normalizeIdentifier("insertoverwriteintoselect");
ResultSet res = executeString("create table " + tableName + " as select l_orderkey from default.lineitem");
assertFalse(res.next());
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
TableDesc orderKeys = catalog.getTableDesc(getCurrentDatabase(), tableName);
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(8, orderKeys.getStats().getNumRows().intValue());
}
// this query will result in the two rows.
res = executeString("insert overwrite into " + tableName + " select l_orderkey from default.lineitem where l_orderkey = 3");
assertFalse(res.next());
res.close();
assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
orderKeys = catalog.getTableDesc(getCurrentDatabase(), tableName);
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(2, orderKeys.getStats().getNumRows().intValue());
}
executeString("DROP TABLE " + tableName + " PURGE");
}
@Test
public final void testInsertOverwriteCapitalTableName() throws Exception {
String tableName = IdentifierUtil.normalizeIdentifier("testInsertOverwriteCapitalTableName");
ResultSet res = executeString("create table " + tableName + " as select * from default.lineitem");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
res = executeString("insert overwrite into " + tableName + " select * from default.lineitem where l_orderkey = 3");
res.close();
TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(2, desc.getStats().getNumRows().intValue());
}
executeString("DROP TABLE " + tableName + " PURGE");
}
@Test
public final void testInsertOverwriteLocation() throws Exception {
ResultSet res = executeQuery();
res.close();
FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
assertTrue(fs.exists(new Path("/tajo-data/testInsertOverwriteCapitalTableName")));
assertEquals(1, fs.listStatus(new Path("/tajo-data/testInsertOverwriteCapitalTableName")).length);
}
@Test
public final void testInsertOverwriteWithCompression() throws Exception {
String tableName = IdentifierUtil.normalizeIdentifier("testInsertOverwriteWithCompression");
ResultSet res = executeFile("testInsertOverwriteWithCompression_ddl.sql");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
res = executeQuery();
res.close();
TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(2, desc.getStats().getNumRows().intValue());
}
FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
assertTrue(fs.exists(new Path(desc.getUri())));
CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());
for (FileStatus file : fs.listStatus(new Path(desc.getUri()))) {
CompressionCodec codec = factory.getCodec(file.getPath());
assertTrue(codec instanceof DeflateCodec);
}
executeString("DROP TABLE " + tableName + " PURGE");
}
@Test
public final void testInsertOverwriteLocationWithCompression() throws Exception {
if (!testingCluster.isHiveCatalogStoreRunning()) {
ResultSet res = executeQuery();
res.close();
FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
Path path = new Path("/tajo-data/testInsertOverwriteLocationWithCompression");
assertTrue(fs.exists(path));
assertEquals(1, fs.listStatus(path).length);
CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());
for (FileStatus file : fs.listStatus(path)){
CompressionCodec codec = factory.getCodec(file.getPath());
assertTrue(codec instanceof DeflateCodec);
}
}
}
@Test
public final void testInsertOverwriteWithAsteriskUsingParquet() throws Exception {
if (!testingCluster.isHiveCatalogStoreRunning()) {
ResultSet res = executeFile("full_table_parquet_ddl.sql");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), "full_table_parquet"));
res = executeString(
"insert overwrite into full_table_parquet select * from default.lineitem where l_orderkey = 3");
res.close();
TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "full_table_parquet");
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(2, desc.getStats().getNumRows().intValue());
}
res = executeString("select * from full_table_parquet;");
assertResultSet(res);
res = executeString("select l_orderkey, l_partkey from full_table_parquet;");
assertResultSet(res, "testInsertOverwriteWithAsteriskUsingParquet2.result");
executeString("DROP TABLE full_table_parquet PURGE");
}
}
@Test
public final void testInsertOverwriteIntoParquet() throws Exception {
if (!testingCluster.isHiveCatalogStoreRunning()) {
executeString("create table parquet_table " +
"(l_orderkey int4, l_shipdate text, l_shipdate_function text) using parquet").close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), "parquet_table"));
executeString(
"insert overwrite into parquet_table " +
"select l_orderkey, l_shipdate, substr(l_shipdate, 1, 10) from default.lineitem").close();
TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "parquet_table");
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(8, desc.getStats().getNumRows().intValue());
}
ResultSet res = executeString("select l_orderkey, l_shipdate, l_shipdate_function " +
"from parquet_table ");
String expected = "l_orderkey,l_shipdate,l_shipdate_function\n" +
"-------------------------------\n" +
"1,1996-03-13,1996-03-13\n" +
"1,1996-04-12,1996-04-12\n" +
"2,1997-01-28,1997-01-28\n" +
"3,1994-02-02,1994-02-02\n" +
"3,1993-11-09,1993-11-09\n" +
"null,null,null\n" +
"null,null,null\n" +
"null,null,null\n";
assertEquals(expected, resultSetToString(res));
executeString("DROP TABLE parquet_table PURGE");
}
}
@Test
public final void testInsertOverwriteIntoPartitionedParquet() throws Exception {
if (!testingCluster.isHiveCatalogStoreRunning()) {
executeString("create table parquet_table " +
"(l_orderkey int4, l_shipdate_function text) using parquet partition by column (l_shipdate text)").close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), "parquet_table"));
executeString(
"insert overwrite into parquet_table " +
"select l_orderkey, substr(l_shipdate, 1, 10), l_shipdate from default.lineitem").close();
TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "parquet_table");
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(8, desc.getStats().getNumRows().intValue());
}
ResultSet res = executeString("select l_orderkey, l_shipdate, l_shipdate_function " +
"from parquet_table ");
String expected = "l_orderkey,l_shipdate,l_shipdate_function\n" +
"-------------------------------\n" +
"3,1993-11-09,1993-11-09\n" +
"3,1994-02-02,1994-02-02\n" +
"1,1996-03-13,1996-03-13\n" +
"1,1996-04-12,1996-04-12\n" +
"2,1997-01-28,1997-01-28\n" +
"null,null,null\n" +
"null,null,null\n" +
"null,null,null\n";
assertEquals(expected, resultSetToString(res));
executeString("DROP TABLE parquet_table PURGE");
}
}
@Test
public final void testInsertOverwriteWithDatabase() throws Exception {
ResultSet res = executeFile("table1_ddl.sql");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
res = executeQuery();
res.close();
TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(8, desc.getStats().getNumRows().intValue());
}
executeString("DROP TABLE table1 PURGE");
}
public final void testInsertOverwriteAllValues(String rawTableName, String query) throws Exception {
String tableName = IdentifierUtil.normalizeIdentifier(rawTableName);
ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
res = executeString(query);
res.close();
TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(1, desc.getStats().getNumRows().intValue());
}
res = executeString("select * from " + tableName + ";");
assertTrue(res.next());
assertEquals(3, res.getMetaData().getColumnCount());
assertEquals(1, res.getInt(1));
assertEquals(2.1f, res.getFloat(2), 10);
assertEquals("test", res.getString(3));
res.close();
executeString("DROP TABLE " + tableName + " PURGE");
}
@Test
public final void testInsertOverwriteAllValues1() throws Exception {
String tbName = getMethodName();
testInsertOverwriteAllValues(tbName, "insert overwrite into " + tbName + " select 1::INT4, 2.1::FLOAT4, 'test'; ");
}
@Test
public final void testInsertOverwriteAllValues2() throws Exception {
String tbName = getMethodName();
testInsertOverwriteAllValues(tbName, "insert overwrite into " + tbName + " VALUES(1::INT4, 2.1::FLOAT4, 'test');");
}
public final void testInsertOverwriteSomeValues(String rawTableName, String sql) throws Exception {
String tableName = IdentifierUtil.normalizeIdentifier(rawTableName);
ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
res = executeString(sql);
res.close();
TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(1, desc.getStats().getNumRows().intValue());
}
res = executeString("select * from " + tableName + ";");
assertTrue(res.next());
assertEquals(3, res.getMetaData().getColumnCount());
assertEquals(1, res.getInt(1));
assertNull(res.getString(2));
assertEquals(0.0, res.getDouble(2), 10);
assertEquals("test", res.getString(3));
res.close();
executeString("DROP TABLE " + tableName + " PURGE");
}
@Test
public final void testInsertOverwriteTableSomeValues1() throws Exception {
String tbName = getMethodName();
testInsertOverwriteSomeValues(tbName,
"insert overwrite into " + tbName + " (col1, col3) select 1::INT4, 'test'");
}
@Test
public final void testInsertOverwriteTableSomeValues2() throws Exception {
String tbName = getMethodName();
testInsertOverwriteSomeValues(tbName,
"insert overwrite into " + tbName + " (col1, col3) VALUES(1::INT4, 'test')");
}
@Test
public final void testInsertOverwritePathWithNonFromQuery() throws Exception {
ResultSet res = executeString("insert overwrite into location " +
"'/tajo-data/testInsertOverwritePathWithNonFromQuery' " +
"USING csv WITH ('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"select 1::INT4, 2.1::FLOAT4, 'test'");
res.close();
FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
Path path = new Path("/tajo-data/testInsertOverwritePathWithNonFromQuery");
assertTrue(fs.exists(path));
assertEquals(1, fs.listStatus(path).length);
CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());
FileStatus file = fs.listStatus(path)[0];
CompressionCodec codec = factory.getCodec(file.getPath());
assertTrue(codec instanceof DeflateCodec);
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(codec.createInputStream(fs.open(file.getPath()))))) {
String line = reader.readLine();
assertNotNull(line);
String[] tokens = line.split("\\|");
assertEquals(3, tokens.length);
assertEquals("1", tokens[0]);
assertEquals("2.1", tokens[1]);
assertEquals("test", tokens[2]);
}
}
@Test
public final void testInsertOverwriteWithUnion() throws Exception {
ResultSet res = executeFile("table1_ddl.sql");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
res = executeFile("testInsertOverwriteWithUnion.sql");
res.close();
String tableDatas = getTableFileContents("table1");
String expected = "1|1|17.0\n" +
"1|1|36.0\n" +
"2|2|38.0\n" +
"3|2|45.0\n" +
"3|3|49.0\n" +
"||\n" +
"||\n" +
"||\n" +
"1|3|173665.47\n" +
"2|4|46929.18\n" +
"3|2|193846.25\n" +
"||\n" +
"||\n" +
"||\n";
assertNotNull(tableDatas);
assertEquals(expected, tableDatas);
executeString("DROP TABLE table1 PURGE");
}
@Test
public final void testInsertOverwriteWithUnionDifferentAlias() throws Exception {
ResultSet res = executeFile("table1_ddl.sql");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
res = executeFile("testInsertOverwriteWithUnionDifferentAlias.sql");
res.close();
String tableDatas = getTableFileContents("table1");
String expected = "1|1|17.0\n" +
"1|1|36.0\n" +
"2|2|38.0\n" +
"3|2|45.0\n" +
"3|3|49.0\n" +
"||\n" +
"||\n" +
"||\n" +
"1|3|173665.47\n" +
"2|4|46929.18\n" +
"3|2|193846.25\n" +
"||\n" +
"||\n" +
"||\n";
assertNotNull(tableDatas);
assertEquals(expected, tableDatas);
executeString("DROP TABLE table1 PURGE");
}
@Test
public final void testInsertOverwriteLocationWithUnion() throws Exception {
ResultSet res = executeFile("testInsertOverwriteLocationWithUnion.sql");
res.close();
String resultDatas= getTableFileContents(new Path("/tajo-data/testInsertOverwriteLocationWithUnion"));
String expected = "1|1|17.0\n" +
"1|1|36.0\n" +
"2|2|38.0\n" +
"3|2|45.0\n" +
"3|3|49.0\n" +
"||\n" +
"||\n" +
"||\n" +
"1|3|173665.47\n" +
"2|4|46929.18\n" +
"3|2|193846.25\n" +
"||\n" +
"||\n" +
"||\n";
assertNotNull(resultDatas);
assertEquals(expected, resultDatas);
}
@Test
public final void testInsertOverwriteLocationWithUnionDifferenceAlias() throws Exception {
ResultSet res = executeFile("testInsertOverwriteLocationWithUnionDifferenceAlias.sql");
res.close();
String resultDatas= getTableFileContents(new Path("/tajo-data/testInsertOverwriteLocationWithUnionDifferenceAlias"));
String expected = "1|1|17.0\n" +
"1|1|36.0\n" +
"2|2|38.0\n" +
"3|2|45.0\n" +
"3|3|49.0\n" +
"||\n" +
"||\n" +
"||\n" +
"1|3|173665.47\n" +
"2|4|46929.18\n" +
"3|2|193846.25\n" +
"||\n" +
"||\n" +
"||\n";
assertNotNull(resultDatas);
assertEquals(expected, resultDatas);
}
@Test
public final void testInsertWithDifferentColumnOrder() throws Exception {
ResultSet res = executeFile("nation_diff_col_order.ddl");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), "nation_diff"));
try {
res = executeFile("testInsertWithDifferentColumnOrder.sql");
res.close();
res = executeString("select * from nation_diff");
assertResultSet(res);
} finally {
executeString("drop table nation_diff purge;");
}
}
@Test
public final void testFixedCharSelectWithNoLength() throws Exception {
ResultSet res = executeFile("test1_nolength_ddl.sql");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), "test1"));
res = executeFile("testInsertIntoSelectWithFixedSizeCharWithNoLength.sql");
res.close();
//remove \0
String resultDatas = getTableFileContents("test1").replaceAll("\0","");
String expected = "a\n";
assertNotNull(resultDatas);
assertEquals(expected.length(), resultDatas.length());
assertEquals(expected, resultDatas);
executeString("DROP TABLE test1 PURGE");
}
@Test
public final void testFixedCharSelect() throws Exception {
ResultSet res = executeFile("test1_ddl.sql");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), "test1"));
res = executeFile("testInsertIntoSelectWithFixedSizeChar.sql");
res.close();
//remove \0
String resultDatas = getTableFileContents("test1").replaceAll("\0","");
String expected = "a\n" +
"abc\n" +
"abcde\n";
assertNotNull(resultDatas);
assertEquals(expected.length(), resultDatas.length());
assertEquals(expected, resultDatas);
executeString("DROP TABLE test1 PURGE");
}
}