blob: d1254ab3a90b0cb27714d54aebb5d1257e589933 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.metadata;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
import org.apache.hadoop.hive.ql.TxnCommandsBaseForTests;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.List;
/**
* Tests for the MSCK REPAIR TABLE operation on transactional tables.
*/
public class TestMSCKRepairOnAcid extends TxnCommandsBaseForTests {
private static final String TEST_DATA_DIR = new File(
System.getProperty("java.io.tmpdir") + File.separator + TestMSCKRepairOnAcid.class.getCanonicalName()
+ "-" + System.currentTimeMillis()).getPath().replaceAll("\\\\", "/");
private final String acidTblPartMsck = "acidtblpartmsck";
@Override
protected String getTestDataDir() {
return TEST_DATA_DIR;
}
/**
* A new partition copied under a table containing only deltas.
* @throws Exception ex
*/
@Test
public void testAddPartitionDeltas() throws Exception {
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
// Insert few rows
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,2,'p1'),(2,3,'p1'),(3,4,'p1')");
List<String> r = runStatementOnDriver("select a, b from " + Table.ACIDTBLPART + " where p='p1' order by a, b");
int[][] expected = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 3, 4 } };
Assert.assertEquals(stringifyValues(expected), r);
// Create target table
runStatementOnDriver("create table " + acidTblPartMsck
+ " (a int, b int) partitioned by (p string) clustered by (a) into 2 buckets"
+ " stored as orc TBLPROPERTIES ('transactional'='true')");
// copy files on fs
FileSystem fs = FileSystem.get(hiveConf);
FileUtil.copy(fs, new Path(getWarehouseDir() + "/" + Table.ACIDTBLPART.toString().toLowerCase() + "/p=p1"), fs,
new Path(getWarehouseDir(), acidTblPartMsck), false, hiveConf);
FileStatus[] fileStatuses = fs.listStatus(new Path(getWarehouseDir(), acidTblPartMsck + "/p=p1"));
Assert.assertEquals(2, fileStatuses.length);
// call msk repair
runStatementOnDriver("msck repair table " + acidTblPartMsck);
r = runStatementOnDriver("select a, b from " + acidTblPartMsck + " where p='p1' order by a, b");
Assert.assertEquals(stringifyValues(expected), r);
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
}
/**
* More partition copied under a table containing only deltas.
* @throws Exception ex
*/
@Test
public void testAddMultiPartitionDeltas() throws Exception {
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
// Insert few rows
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(4,4,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,2,'p2'),(2,3,'p2'),(3,4,'p2')");
List<String> r = runStatementOnDriver("select a, b from " + Table.ACIDTBLPART + " order by a, b");
int[][] expected = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 3, 4 }, { 4, 4 } };
Assert.assertEquals(stringifyValues(expected), r);
// Create target table
runStatementOnDriver("create table " + acidTblPartMsck
+ " (a int, b int) partitioned by (p string) clustered by (a) into 2 buckets"
+ " stored as orc TBLPROPERTIES ('transactional'='true')");
// copy files on fs
FileSystem fs = FileSystem.get(hiveConf);
FileUtil.copy(fs, new Path(getWarehouseDir() + "/" + Table.ACIDTBLPART.toString().toLowerCase() + "/p=p1"), fs,
new Path(getWarehouseDir(), acidTblPartMsck), false, hiveConf);
FileUtil.copy(fs, new Path(getWarehouseDir() + "/" + Table.ACIDTBLPART.toString().toLowerCase() + "/p=p2"), fs,
new Path(getWarehouseDir(), acidTblPartMsck), false, hiveConf);
// call msk repair
runStatementOnDriver("msck repair table " + acidTblPartMsck);
r = runStatementOnDriver("select a, b from " + acidTblPartMsck + " order by a, b");
Assert.assertEquals(stringifyValues(expected), r);
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
}
/**
* A new partition copied under a table containing only deltas, but the table already contains allocated writes,
* and the restored partition has higher writeId.
* @throws Exception ex
*/
@Test
public void testAddPartitionHighWriteIdException() throws Exception {
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
// Insert few rows
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,2,'p2'),(2,3,'p2'),(3,4,'p2')");
List<String> r = runStatementOnDriver("select a, b from " + Table.ACIDTBLPART + " order by a, b");
int[][] expected = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 3, 4 } };
Assert.assertEquals(stringifyValues(expected), r);
// Create target table
runStatementOnDriver("create table " + acidTblPartMsck
+ " (a int, b int) partitioned by (p string) clustered by (a) into 2 buckets"
+ " stored as orc TBLPROPERTIES ('transactional'='true')");
// Insert data in p1 so we allocate writeId in the msck table
runStatementOnDriver("insert into " + acidTblPartMsck + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1')");
// copy files on fs
FileSystem fs = FileSystem.get(hiveConf);
FileUtil.copy(fs, new Path(getWarehouseDir() + "/" + Table.ACIDTBLPART.toString().toLowerCase() + "/p=p2"), fs,
new Path(getWarehouseDir(), acidTblPartMsck), false, false, hiveConf);
// One partition written, one copied
FileStatus[] fileStatuses = fs.listStatus(new Path(getWarehouseDir(), acidTblPartMsck));
Assert.assertEquals(2, fileStatuses.length);
// call msk repair, it should fail, since it will find a delta folder with writeId 2
// that is higher than the allocated max in the table
CommandProcessorException e = runStatementOnDriverNegative("msck repair table " + acidTblPartMsck);
Assert.assertEquals(-1, e.getErrorCode());
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
}
/**
* A new partition copied under a table containing only deltas, but the table already contains allocated writes,
* and the restored partition has lower writeId.
* @throws Exception ex
*/
@Test
public void testAddPartitionLowerWriteId() throws Exception {
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
// Insert few rows
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,2,'p2'),(2,3,'p2'),(3,4,'p2')");
List<String> r = runStatementOnDriver("select a, b from " + Table.ACIDTBLPART + " order by a, b");
int[][] expected = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 3, 4 } };
Assert.assertEquals(stringifyValues(expected), r);
// Create target table
runStatementOnDriver("create table " + acidTblPartMsck
+ " (a int, b int) partitioned by (p string) clustered by (a) into 2 buckets"
+ " stored as orc TBLPROPERTIES ('transactional'='true')");
// Insert data in p2 so we allocate writeId in the msck table
runStatementOnDriver("insert into " + acidTblPartMsck + " partition(p) values(1,2,'p2'),(2,3,'p2'),(3,4,'p2')");
runStatementOnDriver("insert into " + acidTblPartMsck + " partition(p) values(4,5,'p2')");
// copy files on fs
FileSystem fs = FileSystem.get(hiveConf);
FileUtil.copy(fs, new Path(getWarehouseDir() + "/" + Table.ACIDTBLPART.toString().toLowerCase() + "/p=p1"), fs,
new Path(getWarehouseDir(), acidTblPartMsck), false, false, hiveConf);
// One partition written, one copied
FileStatus[] fileStatuses = fs.listStatus(new Path(getWarehouseDir(), acidTblPartMsck));
Assert.assertEquals(2, fileStatuses.length);
// call msk repair
runStatementOnDriver("msck repair table " + acidTblPartMsck);
// The allocated writeId should be 2 and the copied partition has 1 as maximum, so we are on the safe side
r = runStatementOnDriver("select a, b from " + acidTblPartMsck + " order by a, b");
int[][] expected2 = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 3, 4 }, { 4, 5 } };
Assert.assertEquals(stringifyValues(expected2), r);
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
}
/**
* A new partition copied under a table containing compactd delta.
* @throws Exception ex
*/
@Test
public void testAddPartitionMinorCompacted() throws Exception {
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
// Insert few rows
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(4,4,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,2,'p2'),(2,3,'p2'),(3,4,'p2')");
//run Compaction
runStatementOnDriver("alter table " + Table.ACIDTBLPART + " partition (p='p1') compact 'minor'");
runWorker(hiveConf);
List<String> r = runStatementOnDriver("select a, b from " + Table.ACIDTBLPART + " order by a, b");
int[][] expected = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 3, 4 }, { 4, 4 } };
Assert.assertEquals(stringifyValues(expected), r);
// Create target table
runStatementOnDriver("create table " + acidTblPartMsck
+ " (a int, b int) partitioned by (p string) clustered by (a) into 2 buckets"
+ " stored as orc TBLPROPERTIES ('transactional'='true')");
// copy files on fs
FileSystem fs = FileSystem.get(hiveConf);
FileUtil.copy(fs, new Path(getWarehouseDir() + "/" + Table.ACIDTBLPART.toString().toLowerCase() + "/p=p1"), fs,
new Path(getWarehouseDir(), acidTblPartMsck), false, hiveConf);
FileUtil.copy(fs, new Path(getWarehouseDir() + "/" + Table.ACIDTBLPART.toString().toLowerCase() + "/p=p2"), fs,
new Path(getWarehouseDir(), acidTblPartMsck), false, hiveConf);
FileStatus[] fileStatuses = fs.listStatus(new Path(getWarehouseDir(), acidTblPartMsck + "/p=p1"));
// two + one delta
Assert.assertEquals(3, fileStatuses.length);
// call msk repair
runStatementOnDriver("msck repair table " + acidTblPartMsck);
r = runStatementOnDriver("select a, b from " + acidTblPartMsck + " order by a, b");
Assert.assertEquals(stringifyValues(expected), r);
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
}
/**
* A new partition copied under a table containing compacted base.
* @throws Exception ex
*/
@Test
public void testAddPartitionMajorCompacted() throws Exception {
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
// Insert few rows
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(4,4,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,2,'p2'),(2,3,'p2'),(3,4,'p2')");
//run Compaction
runStatementOnDriver("alter table " + Table.ACIDTBLPART + " partition (p='p1') compact 'major'");
runWorker(hiveConf);
List<String> r = runStatementOnDriver("select a, b from " + Table.ACIDTBLPART + " order by a, b");
int[][] expected = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 3, 4 }, { 4, 4 } };
Assert.assertEquals(stringifyValues(expected), r);
// Create target table
runStatementOnDriver("create table " + acidTblPartMsck
+ " (a int, b int) partitioned by (p string) clustered by (a) into 2 buckets"
+ " stored as orc TBLPROPERTIES ('transactional'='true')");
// copy files on fs
FileSystem fs = FileSystem.get(hiveConf);
FileUtil.copy(fs, new Path(getWarehouseDir() + "/" + Table.ACIDTBLPART.toString().toLowerCase() + "/p=p1"), fs,
new Path(getWarehouseDir(), acidTblPartMsck), false, hiveConf);
FileUtil.copy(fs, new Path(getWarehouseDir() + "/" + Table.ACIDTBLPART.toString().toLowerCase() + "/p=p2"), fs,
new Path(getWarehouseDir(), acidTblPartMsck), false, hiveConf);
FileStatus[] fileStatuses = fs.listStatus(new Path(getWarehouseDir(), acidTblPartMsck + "/p=p1"));
// two delta + a base
Assert.assertEquals(3, fileStatuses.length);
// call msk repair
runStatementOnDriver("msck repair table " + acidTblPartMsck);
r = runStatementOnDriver("select a, b from " + acidTblPartMsck + " order by a, b");
Assert.assertEquals(stringifyValues(expected), r);
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
}
/**
* Delete one partition from a table, drop the partition from the HMS, then restore the partition
* and repair the table.
* @throws Exception ex
*/
@Test
public void testBackUpAndRestorePartition() throws Exception {
// Insert few rows
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(4,4,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,2,'p2'),(2,3,'p2'),(3,4,'p2')");
List<String> r = runStatementOnDriver("select a, b from " + Table.ACIDTBLPART + " order by a, b");
int[][] expected = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 3, 4 }, { 4, 4 } };
Assert.assertEquals(stringifyValues(expected), r);
// copy files on fs
FileSystem fs = FileSystem.get(hiveConf);
fs.mkdirs(new Path(getWarehouseDir(), "mybackup"));
FileUtil.copy(fs, new Path(getWarehouseDir() + "/" + Table.ACIDTBLPART.toString().toLowerCase() + "/p=p1"), fs,
new Path(getWarehouseDir(), "mybackup"), true, hiveConf);
// call msk repair to remove partition p1
runStatementOnDriver("msck repair table " + Table.ACIDTBLPART + " SYNC PARTITIONS");
r = runStatementOnDriver("select a, b from " + Table.ACIDTBLPART + " order by a, b");
int[][] expected2 = { { 1, 2 }, { 2, 3 }, { 3, 4 }};
Assert.assertEquals(stringifyValues(expected2), r);
// copy the data back
FileUtil.copy(fs, new Path(getWarehouseDir() + "/" + "mybackup" + "/p=p1"), fs,
new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase()), true, hiveConf);
// call msk repair to add the partition back
runStatementOnDriver("msck repair table " + Table.ACIDTBLPART);
r = runStatementOnDriver("select a, b from " + Table.ACIDTBLPART + " order by a, b");
Assert.assertEquals(stringifyValues(expected), r);
}
/**
* Add a new partition to the table that contains a compacted folder with the visibilityTxnId
* higher than the HighWaterMark in the HMS. This could happen if the HMS data was lost and we try to
* repair the metadata. The system will set the txnId forward in the HMS.
* @throws Exception
*/
@Test
public void testAddPartitionHighVisibilityId() throws Exception {
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
// Insert few rows
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(4,4,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,2,'p2'),(2,3,'p2'),(3,4,'p2')");
//run Compaction
runStatementOnDriver("alter table " + Table.ACIDTBLPART + " partition (p='p1') compact 'minor'");
runWorker(hiveConf);
runCleaner(hiveConf);
List<String> r = runStatementOnDriver("select a, b from " + Table.ACIDTBLPART + " order by a, b");
int[][] expected = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 3, 4 }, { 4, 4 } };
Assert.assertEquals(stringifyValues(expected), r);
// Create target table
runStatementOnDriver("create table " + acidTblPartMsck
+ " (a int, b int) partitioned by (p string) clustered by (a) into 2 buckets"
+ " stored as orc TBLPROPERTIES ('transactional'='true')");
// copy files on fs
FileSystem fs = FileSystem.get(hiveConf);
FileUtil.copy(fs, new Path(getWarehouseDir() + "/" + Table.ACIDTBLPART.toString().toLowerCase() + "/p=p1"), fs,
new Path(getWarehouseDir(), acidTblPartMsck), false, hiveConf);
FileUtil.copy(fs, new Path(getWarehouseDir() + "/" + Table.ACIDTBLPART.toString().toLowerCase() + "/p=p2"), fs,
new Path(getWarehouseDir(), acidTblPartMsck), false, hiveConf);
FileStatus[] fileStatuses = fs.listStatus(new Path(getWarehouseDir(), acidTblPartMsck + "/p=p1"));
// one compacted delta
Assert.assertEquals(1, fileStatuses.length);
// Rename the deltaDir to add a higher visibility transactionId
Path deltaDir = fileStatuses[0].getPath();
AcidUtils.ParsedDelta parsedDelta = AcidUtils.parsedDelta(deltaDir, fs);
long oldTxnId = parsedDelta.getVisibilityTxnId();
String newDeltaDir = AcidUtils.addVisibilitySuffix(deltaDir.toString().substring(0, deltaDir.toString().length() - 9), oldTxnId + 100);
fs.rename(deltaDir, new Path(newDeltaDir));
// call msk repair
runStatementOnDriver("msck repair table " + acidTblPartMsck);
Long nextTxnId = txnHandler.openTxns(new OpenTxnRequest(1, "localhost", "me")).getTxn_ids().get(0);
Assert.assertTrue("TxnId should be incremented", nextTxnId > (oldTxnId + 100));
txnHandler.abortTxn(new AbortTxnRequest(nextTxnId));
// We have to wait for the MetastoreConf.TXN_OPENTXN_TIMEOUT otherwise our hypothetical txnId in the visibilityTxnId
// will be considered open and we will not read the folder
Thread.sleep(1000);
r = runStatementOnDriver("select a, b from " + acidTblPartMsck + " order by a, b");
Assert.assertEquals(stringifyValues(expected), r);
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
}
/**
* Add one partition to an insert-only table that contains an insert overwrite base folder.
* @throws Exception
*/
@Test
public void testAddPartitionMMInsertOverwrite() throws Exception{
final String mmTable = "mmtblpartmsck";
final String sourceTable = "nonacidpartmsck";
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
runStatementOnDriver("drop table if exists " + mmTable);
runStatementOnDriver("drop table if exists " + sourceTable);
runStatementOnDriver("create table " + mmTable + "(a int, b int) partitioned by (p string) stored as orc"
+ " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
runStatementOnDriver("create table " + sourceTable + "(a int, b int) partitioned by (p string) stored as orc"
+ " TBLPROPERTIES ('transactional'='false')");
// Insert few rows
runStatementOnDriver("insert into " + mmTable + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1')");
runStatementOnDriver("insert into " + mmTable + " partition(p) values(4,4,'p1')");
runStatementOnDriver("insert into " + mmTable + " partition(p) values(1,2,'p2'),(2,3,'p2'),(3,4,'p2')");
// Insert some to the source
runStatementOnDriver("insert into " + sourceTable + " partition(p) values(10,10,'p1'),(20,20,'p1'),(30,30,'p1')");
runStatementOnDriver("insert into " + sourceTable + " partition(p) values(40,40,'p1')");
// Overwrite a partition
runStatementOnDriver("insert overwrite table " + mmTable + " PARTITION(p='p1') " +
" select a,b from " + sourceTable + " where " + sourceTable + ".p='p1'");
List<String> r = runStatementOnDriver("select a, b from " + mmTable+ " order by a, b");
int[][] expected = { { 1, 2 }, { 2, 3 }, { 3, 4 },{ 10, 10 }, { 20, 20 }, { 30, 30 }, {40, 40} };
Assert.assertEquals(stringifyValues(expected), r);
// Create target table
runStatementOnDriver("create table " + acidTblPartMsck
+ " (a int, b int) partitioned by (p string) clustered by (a) into 2 buckets"
+ " stored as orc TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
// copy files on fs
FileSystem fs = FileSystem.get(hiveConf);
FileUtil.copy(fs, new Path(getWarehouseDir() + "/" + mmTable + "/p=p1"), fs,
new Path(getWarehouseDir(), acidTblPartMsck), false, hiveConf);
FileUtil.copy(fs, new Path(getWarehouseDir() + "/" + mmTable + "/p=p2"), fs,
new Path(getWarehouseDir(), acidTblPartMsck), false, hiveConf);
FileStatus[] fileStatuses = fs.listStatus(new Path(getWarehouseDir(), acidTblPartMsck + "/p=p1"));
// Two delta and one insert overwrite base
Assert.assertEquals(3, fileStatuses.length);
// call msk repair
runStatementOnDriver("msck repair table " + acidTblPartMsck);
r = runStatementOnDriver("select a, b from " + acidTblPartMsck + " order by a, b");
Assert.assertEquals(stringifyValues(expected), r);
runStatementOnDriver("drop table if exists " + acidTblPartMsck);
runStatementOnDriver("drop table if exists " + mmTable);
runStatementOnDriver("drop table if exists " + sourceTable);
}
/**
* For non partitioned acid table the writeId and visibility txnId repair should work the same.
* @throws Exception ex
*/
@Test
public void testNonPartitionedTable() throws Exception {
String acidTblMsck = "acidtblmsck";
runStatementOnDriver("drop table if exists " + acidTblMsck);
// Insert few rows
runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,1),(2,2),(3,3)");
runStatementOnDriver("insert into " + Table.ACIDTBL + " values(4,4)");
//run Compaction
runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'minor'");
runWorker(hiveConf);
List<String> r = runStatementOnDriver("select a, b from " + Table.ACIDTBL + " order by a, b");
int[][] expected = { { 1, 1 }, { 2, 2 }, { 3, 3 }, { 4, 4 } };
Assert.assertEquals(stringifyValues(expected), r);
// Create target table
runStatementOnDriver("create table " + acidTblMsck
+ " (a int, b int) clustered by (a) into 2 buckets"
+ " stored as orc TBLPROPERTIES ('transactional'='true')");
// copy files on fs
FileSystem fs = FileSystem.get(hiveConf);
for (FileStatus status : fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBL.toString().toLowerCase()))) {
FileUtil.copy(fs, status.getPath(), fs,
new Path(getWarehouseDir(), acidTblMsck), false, hiveConf);
}
FileStatus[] fileStatuses = fs.listStatus(new Path(getWarehouseDir(), acidTblMsck));
// two + one delta
Assert.assertEquals(3, fileStatuses.length);
// call msk repair
runStatementOnDriver("msck repair table " + acidTblMsck);
r = runStatementOnDriver("select a, b from " + acidTblMsck + " order by a, b");
Assert.assertEquals(stringifyValues(expected), r);
runStatementOnDriver("drop table if exists " + acidTblMsck);
}
}