blob: ebac38d10944de51c006c0a8f7d8cc8f0044b12e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.metastore;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.file.Files;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager.RecycleType;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.util.StringUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestReplChangeManager {
private static HiveMetaStoreClient client;
private static HiveConf hiveConf;
private static Warehouse warehouse;
private static MiniDFSCluster m_dfs;
private static String cmroot;
private static FileSystem fs;
private static HiveConf permhiveConf;
private static Warehouse permWarehouse;
private static MiniDFSCluster permDdfs;
private static String permCmroot;
@BeforeClass
public static void setUp() throws Exception {
internalSetUp();
internalSetUpProvidePerm();
try {
client = new HiveMetaStoreClient(hiveConf);
} catch (Throwable e) {
System.err.println("Unable to open the metastore");
System.err.println(StringUtils.stringifyException(e));
throw e;
}
}
private static void internalSetUpProvidePerm() throws Exception {
Configuration configuration = new Configuration();
configuration.set("dfs.permissions.enabled", "false");
String noPermBaseDir = Files.createTempDirectory("noPerm").toFile().getAbsolutePath();
configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, noPermBaseDir);
configuration.set("dfs.client.use.datanode.hostname", "true");
permDdfs = new MiniDFSCluster.Builder(configuration).numDataNodes(2).format(true).build();
permhiveConf = new HiveConf(TestReplChangeManager.class);
permhiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
"hdfs://" + permDdfs.getNameNode().getHostAndPort() + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal);
permhiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true);
permCmroot = "hdfs://" + permDdfs.getNameNode().getHostAndPort() + "/cmroot";
permhiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, permCmroot);
permhiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60);
permWarehouse = new Warehouse(permhiveConf);
}
private static void internalSetUp() throws Exception {
m_dfs = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(2).format(true).build();
hiveConf = new HiveConf(TestReplChangeManager.class);
hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
"hdfs://" + m_dfs.getNameNode().getHostAndPort() + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal);
hiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true);
cmroot = "hdfs://" + m_dfs.getNameNode().getHostAndPort() + "/cmroot";
hiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmroot);
hiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60);
warehouse = new Warehouse(hiveConf);
fs = new Path(cmroot).getFileSystem(hiveConf);
}
@AfterClass
public static void tearDown() {
try {
m_dfs.shutdown();
permDdfs.shutdown();
client.close();
} catch (Throwable e) {
System.err.println("Unable to close metastore");
System.err.println(StringUtils.stringifyException(e));
throw e;
}
}
private Partition createPartition(String dbName, String tblName,
List<FieldSchema> columns, List<String> partVals, SerDeInfo serdeInfo) {
StorageDescriptor sd = new StorageDescriptor(columns, null,
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
false, 0, serdeInfo, null, null, null);
return new Partition(partVals, dbName, tblName, 0, 0, sd, null);
}
private void createFile(Path path, String content) throws IOException {
FSDataOutputStream output = path.getFileSystem(hiveConf).create(path);
output.writeChars(content);
output.close();
}
@Test
public void testRecyclePartTable() throws Exception {
// Create db1/t1/dt=20160101/part
// /dt=20160102/part
// /dt=20160103/part
// Test: recycle single file (dt=20160101/part)
// recycle single partition (dt=20160102)
// recycle table t1
String dbName = "db1";
client.dropDatabase(dbName, true, true);
Database db = new Database();
db.putToParameters(SOURCE_OF_REPLICATION, "1,2,3");
db.setName(dbName);
client.createDatabase(db);
String tblName = "t1";
List<FieldSchema> columns = new ArrayList<FieldSchema>();
columns.add(new FieldSchema("foo", "string", ""));
columns.add(new FieldSchema("bar", "string", ""));
List<FieldSchema> partColumns = new ArrayList<FieldSchema>();
partColumns.add(new FieldSchema("dt", "string", ""));
SerDeInfo serdeInfo = new SerDeInfo("LBCSerDe", LazyBinaryColumnarSerDe.class.getCanonicalName(), new HashMap<String, String>());
StorageDescriptor sd
= new StorageDescriptor(columns, null,
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
false, 0, serdeInfo, null, null, null);
Map<String, String> tableParameters = new HashMap<String, String>();
Table tbl = new Table(tblName, dbName, "", 0, 0, 0, sd, partColumns, tableParameters, "", "", "");
client.createTable(tbl);
List<String> values = Arrays.asList("20160101");
Partition part1 = createPartition(dbName, tblName, columns, values, serdeInfo);
client.add_partition(part1);
values = Arrays.asList("20160102");
Partition part2 = createPartition(dbName, tblName, columns, values, serdeInfo);
client.add_partition(part2);
values = Arrays.asList("20160103");
Partition part3 = createPartition(dbName, tblName, columns, values, serdeInfo);
client.add_partition(part3);
Path part1Path = new Path(warehouse.getDefaultPartitionPath(db, tbl, ImmutableMap.of("dt", "20160101")), "part");
createFile(part1Path, "p1");
String path1Chksum = ReplChangeManager.checksumFor(part1Path, fs);
Path part2Path = new Path(warehouse.getDefaultPartitionPath(db, tbl, ImmutableMap.of("dt", "20160102")), "part");
createFile(part2Path, "p2");
String path2Chksum = ReplChangeManager.checksumFor(part2Path, fs);
Path part3Path = new Path(warehouse.getDefaultPartitionPath(db, tbl, ImmutableMap.of("dt", "20160103")), "part");
createFile(part3Path, "p3");
String path3Chksum = ReplChangeManager.checksumFor(part3Path, fs);
assertTrue(part1Path.getFileSystem(hiveConf).exists(part1Path));
assertTrue(part2Path.getFileSystem(hiveConf).exists(part2Path));
assertTrue(part3Path.getFileSystem(hiveConf).exists(part3Path));
ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf);
// verify cm.recycle(db, table, part) api moves file to cmroot dir
int ret = cm.recycle(part1Path, RecycleType.MOVE, false);
Assert.assertEquals(ret, 1);
Path cmPart1Path = ReplChangeManager.getCMPath(hiveConf, part1Path.getName(), path1Chksum, cmroot.toString());
assertTrue(cmPart1Path.getFileSystem(hiveConf).exists(cmPart1Path));
// Verify dropPartition recycle part files
client.dropPartition(dbName, tblName, Arrays.asList("20160102"));
assertFalse(part2Path.getFileSystem(hiveConf).exists(part2Path));
Path cmPart2Path = ReplChangeManager.getCMPath(hiveConf, part2Path.getName(), path2Chksum, cmroot.toString());
assertTrue(cmPart2Path.getFileSystem(hiveConf).exists(cmPart2Path));
// Verify dropTable recycle partition files
client.dropTable(dbName, tblName);
assertFalse(part3Path.getFileSystem(hiveConf).exists(part3Path));
Path cmPart3Path = ReplChangeManager.getCMPath(hiveConf, part3Path.getName(), path3Chksum, cmroot.toString());
assertTrue(cmPart3Path.getFileSystem(hiveConf).exists(cmPart3Path));
client.dropDatabase(dbName, true, true);
}
@Test
public void testRecycleNonPartTable() throws Exception {
// Create db2/t1/part1
// /part2
// /part3
// Test: recycle single file (part1)
// recycle table t1
String dbName = "db2";
client.dropDatabase(dbName, true, true);
Database db = new Database();
db.putToParameters(SOURCE_OF_REPLICATION, "1, 2, 3");
db.setName(dbName);
client.createDatabase(db);
String tblName = "t1";
List<FieldSchema> columns = new ArrayList<FieldSchema>();
columns.add(new FieldSchema("foo", "string", ""));
columns.add(new FieldSchema("bar", "string", ""));
SerDeInfo serdeInfo = new SerDeInfo("LBCSerDe", LazyBinaryColumnarSerDe.class.getCanonicalName(), new HashMap<String, String>());
StorageDescriptor sd
= new StorageDescriptor(columns, null,
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
false, 0, serdeInfo, null, null, null);
Map<String, String> tableParameters = new HashMap<String, String>();
Table tbl = new Table(tblName, dbName, "", 0, 0, 0, sd, null, tableParameters, "", "", "");
client.createTable(tbl);
Path filePath1 = new Path(warehouse.getDefaultTablePath(db, tblName), "part1");
createFile(filePath1, "f1");
String fileChksum1 = ReplChangeManager.checksumFor(filePath1, fs);
Path filePath2 = new Path(warehouse.getDefaultTablePath(db, tblName), "part2");
createFile(filePath2, "f2");
String fileChksum2 = ReplChangeManager.checksumFor(filePath2, fs);
Path filePath3 = new Path(warehouse.getDefaultTablePath(db, tblName), "part3");
createFile(filePath3, "f3");
String fileChksum3 = ReplChangeManager.checksumFor(filePath3, fs);
assertTrue(filePath1.getFileSystem(hiveConf).exists(filePath1));
assertTrue(filePath2.getFileSystem(hiveConf).exists(filePath2));
assertTrue(filePath3.getFileSystem(hiveConf).exists(filePath3));
ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf);
// verify cm.recycle(Path) api moves file to cmroot dir
cm.recycle(filePath1, RecycleType.MOVE, false);
assertFalse(filePath1.getFileSystem(hiveConf).exists(filePath1));
Path cmPath1 = ReplChangeManager.getCMPath(hiveConf, filePath1.getName(), fileChksum1, cmroot.toString());
assertTrue(cmPath1.getFileSystem(hiveConf).exists(cmPath1));
// Verify dropTable recycle table files
client.dropTable(dbName, tblName);
Path cmPath2 = ReplChangeManager.getCMPath(hiveConf, filePath2.getName(), fileChksum2,cmroot.toString());
assertFalse(filePath2.getFileSystem(hiveConf).exists(filePath2));
assertTrue(cmPath2.getFileSystem(hiveConf).exists(cmPath2));
Path cmPath3 = ReplChangeManager.getCMPath(hiveConf, filePath3.getName(), fileChksum3, cmroot.toString());
assertFalse(filePath3.getFileSystem(hiveConf).exists(filePath3));
assertTrue(cmPath3.getFileSystem(hiveConf).exists(cmPath3));
client.dropDatabase(dbName, true, true);
}
@Test
public void testClearer() throws Exception {
FileSystem fs = warehouse.getWhRoot().getFileSystem(hiveConf);
long now = System.currentTimeMillis();
Path dirDb = new Path(warehouse.getWhRoot(), "db3");
fs.delete(dirDb, true);
fs.mkdirs(dirDb);
Path dirTbl1 = new Path(dirDb, "tbl1");
fs.mkdirs(dirTbl1);
Path part11 = new Path(dirTbl1, "part1");
createFile(part11, "testClearer11");
String fileChksum11 = ReplChangeManager.checksumFor(part11, fs);
Path part12 = new Path(dirTbl1, "part2");
createFile(part12, "testClearer12");
String fileChksum12 = ReplChangeManager.checksumFor(part12, fs);
Path dirTbl2 = new Path(dirDb, "tbl2");
fs.mkdirs(dirTbl2);
Path part21 = new Path(dirTbl2, "part1");
createFile(part21, "testClearer21");
String fileChksum21 = ReplChangeManager.checksumFor(part21, fs);
Path part22 = new Path(dirTbl2, "part2");
createFile(part22, "testClearer22");
String fileChksum22 = ReplChangeManager.checksumFor(part22, fs);
Path dirTbl3 = new Path(dirDb, "tbl3");
fs.mkdirs(dirTbl3);
Path part31 = new Path(dirTbl3, "part1");
createFile(part31, "testClearer31");
String fileChksum31 = ReplChangeManager.checksumFor(part31, fs);
Path part32 = new Path(dirTbl3, "part2");
createFile(part32, "testClearer32");
String fileChksum32 = ReplChangeManager.checksumFor(part32, fs);
ReplChangeManager.getInstance(hiveConf).recycle(dirTbl1, RecycleType.MOVE, false);
ReplChangeManager.getInstance(hiveConf).recycle(dirTbl2, RecycleType.MOVE, false);
ReplChangeManager.getInstance(hiveConf).recycle(dirTbl3, RecycleType.MOVE, true);
assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11, cmroot.toString())));
assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part12.getName(), fileChksum12, cmroot.toString())));
assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21, cmroot.toString())));
assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part22.getName(), fileChksum22, cmroot.toString())));
assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part31.getName(), fileChksum31, cmroot.toString())));
assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part32.getName(), fileChksum32, cmroot.toString())));
fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11, cmroot.toString()),
now - 7 * 86400 * 1000 * 2, now - 7 * 86400 * 1000 * 2);
fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21, cmroot.toString()),
now - 7 * 86400 * 1000 * 2, now - 7 * 86400 * 1000 * 2);
fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part31.getName(), fileChksum31, cmroot.toString()),
now - 7 * 86400 * 1000 * 2, now - 7 * 86400 * 1000 * 2);
fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part32.getName(), fileChksum32, cmroot.toString()),
now - 7 * 86400 * 1000 * 2, now - 7 * 86400 * 1000 * 2);
ReplChangeManager.scheduleCMClearer(hiveConf);
long start = System.currentTimeMillis();
long end;
boolean cleared = false;
do {
Thread.sleep(200);
end = System.currentTimeMillis();
if (end - start > 5000) {
Assert.fail("timeout, cmroot has not been cleared");
}
if (!fs.exists(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11, cmroot.toString())) &&
fs.exists(ReplChangeManager.getCMPath(hiveConf, part12.getName(), fileChksum12, cmroot.toString())) &&
!fs.exists(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21, cmroot.toString())) &&
fs.exists(ReplChangeManager.getCMPath(hiveConf, part22.getName(), fileChksum22, cmroot.toString())) &&
!fs.exists(ReplChangeManager.getCMPath(hiveConf, part31.getName(), fileChksum31, cmroot.toString())) &&
!fs.exists(ReplChangeManager.getCMPath(hiveConf, part32.getName(), fileChksum32, cmroot.toString()))) {
cleared = true;
}
} while (!cleared);
}
@Test
public void testRecycleUsingImpersonation() throws Exception {
FileSystem fs = warehouse.getWhRoot().getFileSystem(hiveConf);
Path dirDb = new Path(warehouse.getWhRoot(), "db3");
long now = System.currentTimeMillis();
fs.delete(dirDb, true);
fs.mkdirs(dirDb);
Path dirTbl1 = new Path(dirDb, "tbl1");
fs.mkdirs(dirTbl1);
Path part11 = new Path(dirTbl1, "part1");
createFile(part11, "testClearer11");
String fileChksum11 = ReplChangeManager.checksumFor(part11, fs);
Path part12 = new Path(dirTbl1, "part2");
createFile(part12, "testClearer12");
String fileChksum12 = ReplChangeManager.checksumFor(part12, fs);
final UserGroupInformation proxyUserUgi =
UserGroupInformation.createRemoteUser("impala");
setGroupsInConf(UserGroupInformation.getCurrentUser().getGroupNames(),
"impala", hiveConf);
//set owner of data path to impala
fs.setOwner(dirTbl1, "impala", "default");
fs.setOwner(part11, "impala", "default");
fs.setOwner(part12, "impala", "default");
proxyUserUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
try {
//impala doesn't have access. Should provide access control exception
ReplChangeManager.getInstance(hiveConf).recycle(dirTbl1, RecycleType.MOVE, false);
Assert.fail();
} catch (AccessControlException e) {
assertTrue(e.getMessage().contains("Permission denied: user=impala, access=EXECUTE"));
assertTrue(e.getMessage().contains("/cmroot"));
}
return null;
});
ReplChangeManager.getInstance().recycle(dirTbl1, RecycleType.MOVE, false);
Assert.assertFalse(fs.exists(part11));
Assert.assertFalse(fs.exists(part12));
assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11, cmroot)));
assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part12.getName(), fileChksum12, cmroot)));
fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11, cmroot),
now - 7 * 86400 * 1000 * 2, now - 7 * 86400 * 1000 * 2);
ReplChangeManager.scheduleCMClearer(hiveConf);
long start = System.currentTimeMillis();
long end;
boolean cleared = false;
do {
Thread.sleep(200);
end = System.currentTimeMillis();
if (end - start > 5000) {
Assert.fail("timeout, cmroot has not been cleared");
}
if (!fs.exists(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11, cmroot)) &&
fs.exists(ReplChangeManager.getCMPath(hiveConf, part12.getName(), fileChksum12, cmroot))) {
cleared = true;
}
} while (!cleared);
}
@Test
public void tesRecyleImpersionationWithGroupPermissions() throws Exception {
FileSystem fs = warehouse.getWhRoot().getFileSystem(hiveConf);
Path dirDb = new Path(warehouse.getWhRoot(), "db3");
long now = System.currentTimeMillis();
fs.delete(dirDb, true);
fs.mkdirs(dirDb);
Path dirTbl2 = new Path(dirDb, "tbl2");
fs.mkdirs(dirTbl2);
Path part21 = new Path(dirTbl2, "part1");
createFile(part21, "testClearer21");
String fileChksum21 = ReplChangeManager.checksumFor(part21, fs);
Path part22 = new Path(dirTbl2, "part2");
createFile(part22, "testClearer22");
String fileChksum22 = ReplChangeManager.checksumFor(part22, fs);
final UserGroupInformation proxyUserUgi =
UserGroupInformation.createUserForTesting("impala2", new String[] {"supergroup"});
//set owner of data path to impala2
fs.setOwner(dirTbl2, "impala2", "default");
fs.setOwner(part21, "impala2", "default");
fs.setOwner(part22, "impala2", "default");
proxyUserUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
try {
//impala2 doesn't have access but it belongs to a group which does.
ReplChangeManager.getInstance(hiveConf).recycle(dirTbl2, RecycleType.MOVE, false);
} catch (Exception e) {
Assert.fail();
}
return null;
});
Assert.assertFalse(fs.exists(part21));
Assert.assertFalse(fs.exists(part22));
assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21, cmroot)));
assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part22.getName(), fileChksum22, cmroot)));
fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21, cmroot),
now - 7 * 86400 * 1000 * 2, now - 7 * 86400 * 1000 * 2);
ReplChangeManager.scheduleCMClearer(hiveConf);
long start = System.currentTimeMillis();
long end;
boolean cleared = false;
do {
Thread.sleep(200);
end = System.currentTimeMillis();
if (end - start > 5000) {
Assert.fail("timeout, cmroot has not been cleared");
}
if (!fs.exists(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21, cmroot)) &&
fs.exists(ReplChangeManager.getCMPath(hiveConf, part22.getName(), fileChksum22, cmroot))) {
cleared = true;
}
} while (!cleared);
}
@Test
public void testRecycleUsingImpersonationWithAccess() throws Exception {
try {
ReplChangeManager.resetReplChangeManagerInstance();
ReplChangeManager.getInstance(permhiveConf);
FileSystem fs = permWarehouse.getWhRoot().getFileSystem(permhiveConf);
long now = System.currentTimeMillis();
Path dirDb = new Path(permWarehouse.getWhRoot(), "db3");
fs.delete(dirDb, true);
fs.mkdirs(dirDb);
Path dirTbl1 = new Path(dirDb, "tbl1");
fs.mkdirs(dirTbl1);
Path part11 = new Path(dirTbl1, "part1");
createFile(part11, "testClearer11");
String fileChksum11 = ReplChangeManager.checksumFor(part11, fs);
Path part12 = new Path(dirTbl1, "part2");
createFile(part12, "testClearer12");
String fileChksum12 = ReplChangeManager.checksumFor(part12, fs);
final UserGroupInformation proxyUserUgi =
UserGroupInformation.createRemoteUser("impala");
setGroupsInConf(UserGroupInformation.getCurrentUser().getGroupNames(),
"impala", permhiveConf);
//set owner of data path to impala
fs.setOwner(dirTbl1, "impala", "default");
proxyUserUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
ReplChangeManager.getInstance().recycle(dirTbl1, RecycleType.MOVE, false);
return null;
});
Assert.assertFalse(fs.exists(part11));
Assert.assertFalse(fs.exists(part12));
assertTrue(fs.exists(ReplChangeManager.getCMPath(permhiveConf, part11.getName(), fileChksum11, permCmroot)));
assertTrue(fs.exists(ReplChangeManager.getCMPath(permhiveConf, part12.getName(), fileChksum12, permCmroot)));
fs.setTimes(ReplChangeManager.getCMPath(permhiveConf, part11.getName(), fileChksum11, permCmroot),
now - 7 * 86400 * 1000 * 2, now - 7 * 86400 * 1000 * 2);
ReplChangeManager.scheduleCMClearer(permhiveConf);
long start = System.currentTimeMillis();
long end;
boolean cleared = false;
do {
Thread.sleep(200);
end = System.currentTimeMillis();
if (end - start > 5000) {
Assert.fail("timeout, cmroot has not been cleared");
}
//Impala owned file is cleared by Hive CMClearer
if (!fs.exists(ReplChangeManager.getCMPath(permhiveConf, part11.getName(), fileChksum11, permCmroot)) &&
fs.exists(ReplChangeManager.getCMPath(permhiveConf, part12.getName(), fileChksum12, permCmroot))) {
cleared = true;
}
} while (!cleared);
} finally {
ReplChangeManager.resetReplChangeManagerInstance();
ReplChangeManager.getInstance(hiveConf);
}
}
private void setGroupsInConf(String[] groupNames, String proxyUserName, Configuration conf)
throws IOException {
conf.set(
DefaultImpersonationProvider.getTestProvider().getProxySuperuserGroupConfKey(proxyUserName),
StringUtils.join(",", Arrays.asList(groupNames)));
configureSuperUserIPAddresses(conf, proxyUserName);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
}
private void configureSuperUserIPAddresses(Configuration conf,
String superUserShortName) throws IOException {
List<String> ipList = new ArrayList<String>();
Enumeration<NetworkInterface> netInterfaceList = NetworkInterface
.getNetworkInterfaces();
while (netInterfaceList.hasMoreElements()) {
NetworkInterface inf = netInterfaceList.nextElement();
Enumeration<InetAddress> addrList = inf.getInetAddresses();
while (addrList.hasMoreElements()) {
InetAddress addr = addrList.nextElement();
ipList.add(addr.getHostAddress());
}
}
StringBuilder builder = new StringBuilder();
for (String ip : ipList) {
builder.append(ip);
builder.append(',');
}
builder.append("127.0.1.1,");
builder.append(InetAddress.getLocalHost().getCanonicalHostName());
conf.setStrings(DefaultImpersonationProvider.getTestProvider().getProxySuperuserIpConfKey(superUserShortName),
builder.toString());
}
@Test
public void shouldIdentifyCMURIs() {
assertTrue(ReplChangeManager
.isCMFileUri(new Path("hdfs://localhost:90000/somepath/adir/", "ab.jar#e239s2233")));
assertFalse(ReplChangeManager
.isCMFileUri(new Path("/somepath/adir/", "ab.jar")));
}
}