blob: 7e54dde6f92675280a077c6b81c030c0ab39056b [file] [log] [blame]
/*
* Copyright 2014 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import junit.framework.JUnit4TestAdapter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
import org.apache.hadoop.hive.ql.metadata.*;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
/**
* Tests DDL with remote metastore service and second namenode (HIVE-6374)
*
*/
public class TestDDLWithRemoteMetastoreSecondNamenode {
static HiveConf conf;
private static final String Database1Name = "db1_nondefault_nn";
private static final String Database2Name = "db2_nondefault_nn";
private static final String Table1Name = "table1_nondefault_nn";
private static final String Table2Name = "table2_nondefault_nn";
private static final String Table3Name = "table3_nondefault_nn";
private static final String Table4Name = "table4_nondefault_nn";
private static final String Table5Name = "table5_nondefault_nn";
private static final String Table6Name = "table6_nondefault_nn";
private static final String Table7Name = "table7_nondefault_nn";
private static final String tmpdir = System.getProperty("test.tmp.dir");
private static final String tmpdirFs2 = "/" + TestDDLWithRemoteMetastoreSecondNamenode.class.getName();
private static final Path tmppath = new Path(tmpdir);
private static final Path tmppathFs2 = new Path(tmpdirFs2);
private static String fs2Uri;
private static MiniDFSCluster miniDfs = null;
private static Hive db;
private static FileSystem fs, fs2;
private static HiveConf jobConf;
private static IDriver driver;
private static int tests = 0;
private static Boolean isInitialized = false;
@Before
public void setUp() throws Exception {
if (tests > 0) {
return;
}
tests = new JUnit4TestAdapter(this.getClass()).countTestCases();
try {
conf = new HiveConf(ExecDriver.class);
SessionState.start(conf);
// Test with remote metastore service
int port = MetaStoreTestUtils.startMetaStoreWithRetry();
conf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port);
conf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
conf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, new URI(tmppath + "/warehouse").getPath());
// Initialize second mocked filesystem (implement only necessary stuff)
// Physical files are resides in local file system in the similar location
jobConf = new HiveConf(conf);
miniDfs = new MiniDFSCluster(new Configuration(), 1, true, null);
fs2 = miniDfs.getFileSystem();
try {
fs2.delete(tmppathFs2, true);
}
catch (IOException e) {
}
fs2.mkdirs(tmppathFs2);
fs2Uri = fs2.getUri().toString();
jobConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, fs2Uri);
driver = DriverFactory.newDriver(jobConf);
fs = FileSystem.get(conf);
if (fs.exists(tmppath) && !fs.getFileStatus(tmppath).isDir()) {
throw new RuntimeException(tmpdir + " exists but is not a directory");
}
if (!fs.exists(tmppath)) {
if (!fs.mkdirs(tmppath)) {
throw new RuntimeException("Could not make scratch directory "
+ tmpdir);
}
}
db = Hive.get(conf);
cleanup();
isInitialized = true;
} catch (Exception e) {
throw new RuntimeException("Encountered exception " + e.getMessage()
+ (e.getCause() == null ? "" : ", caused by: " + e.getCause().getMessage()), e);
}
finally {
if (!isInitialized) {
shutdownMiniDfs();
}
}
}
@After
public void tearDown() throws Exception {
if (--tests == 0) {
cleanup();
shutdownMiniDfs();
}
}
private void shutdownMiniDfs() {
if(miniDfs != null) {
miniDfs.shutdown();
}
}
private void cleanup() throws Exception {
String[] srctables = {Table1Name, Table2Name, Database1Name + "." + Table3Name,
Database1Name + "." + Table4Name, Table5Name, Table6Name};
for (String src : srctables) {
driver.run("DROP TABLE IF EXISTS " + src);
}
String[] srcdatabases = {Database1Name, Database2Name};
for (String src : srcdatabases) {
driver.run("DROP DATABASE IF EXISTS " + src + " CASCADE");
}
}
private void executeQuery(String query) throws Exception {
try {
CommandProcessorResponse result = driver.run(query);
assertNotNull("driver.run() was expected to return result for query: " + query, result);
} catch (CommandProcessorException e) {
throw new RuntimeException("Execution of (" + query + ") failed with exit status: " +
e.getResponseCode() + ", " + e.getMessage() + ", query: " + query);
}
}
private String buildLocationClause(String location) {
return location == null ? "" : (" LOCATION '" + location + "'");
}
private void addPartitionAndCheck(Table table, String column,
String value, String location) throws Exception {
executeQuery("ALTER TABLE " + table.getTableName() +
" ADD PARTITION (" + column + "='" + value + "')" +
buildLocationClause(location));
HashMap<String, String> partitionDef1 = new HashMap<String, String>();
partitionDef1.put(column, value);
Partition partition = db.getPartition(table, partitionDef1, false);
assertNotNull("Partition object is expected for " + Table1Name , partition);
String locationActual = partition.getLocation();
if (location == null) {
assertEquals("Partition should be located in the second filesystem",
fs2.makeQualified(new Path(table.getTTable().getSd().getLocation())).toString()
+ "/p=p1", locationActual);
}
else if (new Path(location).toUri().getScheme()!= null) {
assertEquals("Partition should be located in the first filesystem",
fs.makeQualified(new Path(location)).toString(), locationActual);
}
else {
assertEquals("Partition should be located in the second filesystem",
fs2.makeQualified(new Path(location)).toString(), locationActual);
}
}
private void alterPartitionAndCheck(Table table, String column, String value, String location) throws Exception {
assertNotNull(location);
executeQuery("ALTER TABLE " + table.getTableName() +
" PARTITION (" + column + "='" + value + "')" +
" SET LOCATION '" + location + "'");
HashMap<String, String> partitions = new HashMap<String, String>();
partitions.put(column, value);
Partition partition = db.getPartition(table, partitions, false);
assertNotNull("Partition object is expected for " + table.getTableName() , partition);
String locationActual = partition.getLocation();
if (new Path(location).toUri().getScheme() != null) {
assertEquals("Partition should be located in the first filesystem",
fs.makeQualified(new Path(location)).toString(), locationActual);
}
else {
assertEquals("Partition should be located in the second filesystem",
fs2.makeQualified(new Path(location)).toString(), locationActual);
}
}
private Table createTableAndCheck(String tableName, String tableLocation)
throws Exception {
return createTableAndCheck(null, tableName, tableLocation);
}
private Table createTableAndCheck(Table baseTable, String tableName, String tableLocation) throws Exception {
executeQuery("CREATE EXTERNAL TABLE " + tableName + (baseTable == null ?
" (col1 string, col2 string) PARTITIONED BY (p string) " :
" LIKE " + baseTable.getTableName())
+ buildLocationClause(tableLocation));
Table table = db.getTable(tableName);
assertNotNull("Table object is expected for " + tableName , table);
String location = table.getTTable().getSd().getLocation();
if (tableLocation != null) {
assertEquals("Table should be located in the second filesystem",
fs2.makeQualified(new Path(tableLocation)).toString(), location);
}
else {
// Since warehouse path is non-qualified the table should be located on second filesystem
assertEquals("Table should be located in the second filesystem",
fs2.getUri().getScheme(), new URI(location).getScheme());
}
return table;
}
private void createDatabaseAndCheck(String databaseName, String databaseLocation) throws Exception {
executeQuery("CREATE DATABASE " + databaseName + buildLocationClause(databaseLocation));
Database database = db.getDatabase(databaseName);
assertNotNull("Database object is expected for " + databaseName , database);
String location = database.getLocationUri().toString();
if (databaseLocation != null) {
assertEquals("Database should be located in the second filesystem",
fs2.makeQualified(new Path(databaseLocation)).toString(), location);
}
else {
// Since warehouse path is non-qualified the database should be located on second filesystem
assertEquals("Database should be located in the second filesystem",
fs2.getUri().getScheme(), new URI(location).getScheme());
}
}
@Test
public void testAlterPartitionSetLocationNonDefaultNameNode() throws Exception {
assertTrue("Test suite should have been initialized", isInitialized);
String tableLocation = tmppathFs2 + "/" + "test_set_part_loc";
Table table = createTableAndCheck(Table7Name, tableLocation);
addPartitionAndCheck(table, "p", "p1", "/tmp/test/1");
alterPartitionAndCheck(table, "p", "p1", "/tmp/test/2");
}
@Test
public void testCreateDatabaseWithTableNonDefaultNameNode() throws Exception {
assertTrue("Test suite should be initialied", isInitialized );
final String tableLocation = tmppathFs2 + "/" + Table3Name;
final String databaseLocation = tmppathFs2 + "/" + Database1Name;
// Create database in specific location (absolute non-qualified path)
createDatabaseAndCheck(Database1Name, databaseLocation);
// Create database without location clause
createDatabaseAndCheck(Database2Name, null);
// Create table in database in specific location
createTableAndCheck(Database1Name + "." + Table3Name, tableLocation);
// Create table in database without location clause
createTableAndCheck(Database1Name + "." + Table4Name, null);
}
}