blob: fa15e2876aebe92cf83f76de381aceae8d60d006 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.List;
/**
* Tests related to support of ADD PARTITION with Acid/MM tables
* Most tests run in vectorized and non-vectorized mode since we currently have a vectorized and
* a non-vectorized acid readers and it's critical that ROW_IDs are generated the same way.
*
* Side Note: Alter Table Add Partition does no validations on the data - not file name checks,
* not Input/OutputFormat, bucketing etc...
*/
public class TestTxnAddPartition extends TxnCommandsBaseForTests {
static final private Logger LOG = LoggerFactory.getLogger(TestTxnAddPartition.class);
private static final String TEST_DATA_DIR =
new File(System.getProperty("java.io.tmpdir") +
File.separator + TestTxnAddPartition.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
).getPath().replaceAll("\\\\", "/");
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@Rule
public ExpectedException exception = ExpectedException.none();
@Override
protected String getTestDataDir() {
return TEST_DATA_DIR;
}
@Test
public void addPartition() throws Exception {
addPartition(false);
}
@Test
public void addPartitionVectorized() throws Exception {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
addPartition(true);
}
/**
* Tests adding multiple partitions
* adding partition w/o location
* adding partition when it already exists
* adding partition when it already exists with "if not exists"
*/
private void addPartition(boolean isVectorized) throws Exception {
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("drop table if exists Tstage");
runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as orc" +
" tblproperties('transactional'='true')");
runStatementOnDriver("create table Tstage (a int, b int) stored as orc" +
" tblproperties('transactional'='false')");
runStatementOnDriver("insert into Tstage values(0,2),(0,4)");
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/2'");
runStatementOnDriver("ALTER TABLE T ADD" +
" PARTITION (p=0) location '" + getWarehouseDir() + "/1/data'" +
" PARTITION (p=1) location '" + getWarehouseDir() + "/2/data'" +
" PARTITION (p=2)");
String testQuery = isVectorized ? "select ROW__ID, p, a, b from T order by p, ROW__ID" :
"select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID";
String[][] expected = new String[][]{
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2",
"warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4",
"warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t0\t2",
"warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t0\t4",
"warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}};
checkResult(expected, testQuery, isVectorized, "add 2 parts w/data and 1 empty", LOG);
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/3'");
//should be an error since p=3 exists
CommandProcessorException e =
runStatementOnDriverNegative("ALTER TABLE T ADD PARTITION (p=0) location '" + getWarehouseDir() + "/3/data'");
Assert.assertTrue("add existing partition",
e.getMessage() != null && e.getMessage().contains("Partition already exists"));
//should be no-op since p=3 exists
String stmt = "ALTER TABLE T ADD IF NOT EXISTS " +
"PARTITION (p=0) location '" + getWarehouseDir() + "/3/data' "//p=0 exists and is not empty
+ "PARTITION (p=2) location '" + getWarehouseDir() + "/3/data'"//p=2 exists and is empty
+ "PARTITION (p=3) location '" + getWarehouseDir() + "/3/data'";//p=3 doesn't exist
runStatementOnDriver(stmt);
String[][] expected2 = new String[][]{
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2",
"warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4",
"warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t0\t2",
"warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t0\t4",
"warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t3\t0\t2",
"warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"},
{"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t3\t0\t4",
"warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"}};
checkResult(expected2, testQuery, isVectorized, "add 2 existing parts and 1 empty", LOG);
}
@Test
public void addPartitionMM() throws Exception {
addPartitionMM(false);
}
@Test
public void addPartitionMMVectorized() throws Exception {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
addPartitionMM(true);
}
/**
* Micro managed table test
* Tests adding multiple partitions
* adding partition w/o location
* adding partition when it already exists
* adding partition when it already exists with "if not exists"
*/
private void addPartitionMM(boolean isVectorized) throws Exception {
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("drop table if exists Tstage");
runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as orc" +
" tblproperties('transactional'='true', 'transactional_properties'='insert_only')");
runStatementOnDriver("create table Tstage (a int, b int) stored as orc" +
" tblproperties('transactional'='false')");
runStatementOnDriver("insert into Tstage values(0,2),(0,4)");
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/2'");
runStatementOnDriver("ALTER TABLE T ADD" +
" PARTITION (p=0) location '" + getWarehouseDir() + "/1/data'" +
" PARTITION (p=1) location '" + getWarehouseDir() + "/2/data'" +
" PARTITION (p=2)");
String testQuery = isVectorized ? "select p, a, b from T order by p, a, b" :
"select p, a, b, INPUT__FILE__NAME from T order by p, a, b";
String[][] expected = new String[][]{
{"0\t0\t2", "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"},
{"0\t0\t4", "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"},
{"1\t0\t2", "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"},
{"1\t0\t4", "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}};
checkResult(expected, testQuery, isVectorized, "add 2 parts w/data and 1 empty", LOG);
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/3'");
//should be an error since p=3 exists
CommandProcessorException e =
runStatementOnDriverNegative("ALTER TABLE T ADD PARTITION (p=0) location '" + getWarehouseDir() + "/3/data'");
Assert.assertTrue("add existing partition",
e.getMessage() != null && e.getMessage().contains("Partition already exists"));
//should be no-op since p=3 exists
runStatementOnDriver("ALTER TABLE T ADD IF NOT EXISTS " +
"PARTITION (p=0) location '" + getWarehouseDir() + "/3/data' "//p=0 exists and is not empty
+ "PARTITION (p=2) location '" + getWarehouseDir() + "/3/data'"//p=2 exists and is empty
+ "PARTITION (p=3) location '" + getWarehouseDir() + "/3/data'");//p=3 doesn't exist
String[][] expected2 = new String[][]{
{"0\t0\t2", "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"},
{"0\t0\t4", "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"},
{"1\t0\t2", "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"},
{"1\t0\t4", "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"},
{"3\t0\t2", "warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"},
{"3\t0\t4", "warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"}};
checkResult(expected2, testQuery, isVectorized, "add 2 existing parts and 1 empty", LOG);
}
@Test
public void addPartitionBucketed() throws Exception {
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("drop table if exists Tstage");
runStatementOnDriver("create table T (a int, b int) partitioned by (p int) " +
"clustered by (a) into 2 buckets stored as orc tblproperties('transactional'='true')");
runStatementOnDriver("create table Tstage (a int, b int) clustered by (a) into 2 " +
"buckets stored as orc tblproperties('transactional'='false')");
runStatementOnDriver("insert into Tstage values(0,2),(1,4)");
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
runStatementOnDriver("ALTER TABLE T ADD PARTITION (p=0) location '"
+ getWarehouseDir() + "/1/data'");
List<String> rs = runStatementOnDriver(
"select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID");
String[][] expected = new String[][]{
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t0\t0\t2",
"warehouse/t/p=0/delta_0000001_0000001_0000/000001_0"},
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t0\t1\t4",
"warehouse/t/p=0/delta_0000001_0000001_0000/000001_0"}};
checkExpected(rs, expected, "add partition (p=0)");
}
private void checkExpected(List<String> rs, String[][] expected, String msg) {
super.checkExpected(rs, expected, msg, LOG, true);
}
/**
* Check to make sure that if files being loaded don't have standard Hive names, that they are
* renamed during add.
*/
@Test
public void addPartitionRename() throws Exception {
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("drop table if exists Tstage");
runStatementOnDriver("create table T (a int, b int) partitioned by (p int) " +
"stored as orc tblproperties('transactional'='true')");
//bucketed just so that we get 2 files
runStatementOnDriver("create table Tstage (a int, b int) clustered by (a) into 2 " +
"buckets stored as orc tblproperties('transactional'='false')");
runStatementOnDriver("insert into Tstage values(0,2),(1,4)");
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
FileSystem fs = FileSystem.get(hiveConf);
fs.rename(new Path(getWarehouseDir() + "/1/data/000000_0"), new Path(getWarehouseDir() + "/1/data/part-m000"));
fs.rename(new Path(getWarehouseDir() + "/1/data/000001_0"), new Path(getWarehouseDir() + "/1/data/part-m001"));
runStatementOnDriver("ALTER TABLE T ADD PARTITION (p=0) location '"
+ getWarehouseDir() + "/1/data'");
List<String> rs = runStatementOnDriver(
"select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID");
String[][] expected = new String[][]{
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t0\t0\t2",
"warehouse/t/p=0/delta_0000001_0000001_0000/000001_0"},
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t0\t1\t4",
"warehouse/t/p=0/delta_0000001_0000001_0000/000001_0"}};
checkExpected(rs, expected, "add partition (p=0)");
}
/**
* {@link TestDbTxnManager2#testAddPartitionLocks}
*/
@Ignore
@Test
public void testLocks() throws Exception {
}
@Test
public void addPartitionTransactional() throws Exception {
exception.expect(RuntimeException.class);
exception.expectMessage("was created by Acid write");
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("drop table if exists Tstage");
runStatementOnDriver("create table T (a int, b int) partitioned by (p int) " +
"clustered by (a) into 2 buckets stored as orc tblproperties('transactional'='true')");
runStatementOnDriver("create table Tstage (a int, b int) partitioned by (p int) clustered by (a) into 2 " +
"buckets stored as orc tblproperties('transactional'='true')");
runStatementOnDriver("insert into Tstage partition(p=1) values(0,2),(1,4)");
runStatementOnDriver("ALTER TABLE T ADD PARTITION (p=0) location '"
+ getWarehouseDir() + "/tstage/p=1/delta_0000001_0000001_0000/bucket_00001_0'");
}
}