blob: 85b0e97be41036ad0d6844f9dbe20ebf970c5a1d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.hcatalog.api.HCatAddPartitionDesc;
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.hive.hcatalog.api.HCatClient.DropDBMode;
import org.apache.hive.hcatalog.api.HCatCreateDBDesc;
import org.apache.hive.hcatalog.api.HCatCreateTableDesc;
import org.apache.hive.hcatalog.api.HCatPartition;
import org.apache.hive.hcatalog.common.HCatConstants;
import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hive.hcatalog.data.schema.HCatFieldSchema.Type;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.XLog;
import org.junit.Assert;
public class MiniHCatServer {
public static enum RUNMODE {
LOCAL, SERVER
};
private static XLog LOG = XLog.getLog(MiniHCatServer.class);
private static final Random RANDOM = new Random();
private RUNMODE mode;
private Configuration hadoopConf;
private int msPort;
private HiveConf hiveConf;
private HCatClient hcatClient;
private Thread serverThread;
private Map<String, String> sysProps;
public MiniHCatServer(RUNMODE mode, Configuration hadoopConf) throws Exception {
this.mode = mode;
this.hadoopConf = hadoopConf;
sysProps = new HashMap<String, String>();
}
public void start() throws Exception {
if (mode.equals(RUNMODE.LOCAL)) {
initLocalMetastoreConf();
}
else {
this.msPort = RANDOM.nextInt(100) + 30000;
startMetastoreServer();
initMetastoreServerConf();
}
hcatClient = HCatClient.create(hiveConf);
}
public void shutdown() throws Exception {
resetSystemProperties();
hcatClient.close();
if (mode.equals(RUNMODE.SERVER)) {
// No clean way to stop hive metastore server.
serverThread.stop();
}
}
private void initLocalMetastoreConf() throws IOException {
hiveConf = new HiveConf(hadoopConf, this.getClass());
hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new File("target/warehouse").getAbsolutePath());
hiveConf.set("hive.metastore.local", "true"); // For hive 0.9
hiveConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:target/metastore_db;create=true");
setSystemProperty("hive.metastore.local", "true");
setSystemProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new File("target/warehouse").getAbsolutePath());
setSystemProperty(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
"jdbc:derby:target/metastore_db;create=true");
File derbyLogFile = new File("target/derby.log");
derbyLogFile.createNewFile();
setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());
}
private void initMetastoreServerConf() throws Exception {
hiveConf = new HiveConf(hadoopConf, this.getClass());
hiveConf.set("hive.metastore.local", "false"); // For hive 0.9
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort);
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
}
private void startMetastoreServer() throws Exception {
final HiveConf serverConf = new HiveConf(hadoopConf, this.getClass());
serverConf.set("hive.metastore.local", "false");
serverConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:target/metastore_db;create=true");
//serverConf.set(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname, NotificationListener.class.getName());
File derbyLogFile = new File("target/derby.log");
derbyLogFile.createNewFile();
setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());
serverThread = new Thread(new Runnable() {
@Override
public void run() {
try {
HiveMetaStore.startMetaStore(msPort, ShimLoader.getHadoopThriftAuthBridge(), serverConf);
LOG.info("Started metastore server on port " + msPort);
}
catch (Throwable e) {
LOG.error("Metastore Thrift Server threw an exception...", e);
}
}
});
serverThread.setDaemon(true);
serverThread.start();
Thread.sleep(10000L);
}
public static void resetHiveConfStaticVariables() throws Exception {
HiveConf.setHiveSiteLocation(HiveConf.class.getClassLoader().getResource("hive-site.xml"));
}
private void setSystemProperty(String name, String value) {
if (!sysProps.containsKey(name)) {
String currentValue = System.getProperty(name);
sysProps.put(name, currentValue);
}
if (value != null) {
System.setProperty(name, value);
}
else {
System.getProperties().remove(name);
}
}
private void resetSystemProperties() {
for (Map.Entry<String, String> entry : sysProps.entrySet()) {
if (entry.getValue() != null) {
System.setProperty(entry.getKey(), entry.getValue());
}
else {
System.getProperties().remove(entry.getKey());
}
}
sysProps.clear();
}
public Configuration getMetaStoreConf() {
return hiveConf;
}
public String getMetastoreAuthority() {
if (mode.equals(RUNMODE.SERVER)) {
return "localhost:" + msPort;
}
else {
return "unittest-local";
}
}
public String getMetastoreURI() {
return hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname);
}
public HCatClient getHCatClient() {
return hcatClient;
}
public URI getHCatURI(String db, String table, String partitions) throws URISyntaxException {
StringBuilder uri = new StringBuilder();
uri.append("hcat://").append(getMetastoreAuthority()).append("/").append(db).append("/").append(table)
.append("/").append(partitions);
return new URI(uri.toString());
}
public URI getHCatURI(String db, String table) throws URISyntaxException {
StringBuilder uri = new StringBuilder();
uri.append("hcat://").append(getMetastoreAuthority()).append("/").append(db).append("/").append(table);
return new URI(uri.toString());
}
public void createDatabase(String db, String location) throws Exception {
HCatCreateDBDesc dbDesc = HCatCreateDBDesc.create(db).ifNotExists(true).location(location).build();
hcatClient.createDatabase(dbDesc);
List<String> dbNames = hcatClient.listDatabaseNamesByPattern(db);
Assert.assertTrue(dbNames.contains(db));
}
public void createTable(String db, String table, String partitionCols) throws Exception {
List<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
cols.add(new HCatFieldSchema("userid", Type.INT, "userid"));
cols.add(new HCatFieldSchema("viewtime", Type.BIGINT, "view time"));
cols.add(new HCatFieldSchema("pageurl", Type.STRING, "page url visited"));
cols.add(new HCatFieldSchema("ip", Type.STRING, "IP Address of the User"));
ArrayList<HCatFieldSchema> ptnCols = new ArrayList<HCatFieldSchema>();
for (String partitionCol : partitionCols.split(",")) {
ptnCols.add(new HCatFieldSchema(partitionCol, Type.STRING, null));
}
// Remove this once NotificationListener is fixed and available in HCat snapshot
Map<String, String> tblProps = new HashMap<String, String>();
tblProps.put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME, "hcat." + db + "." + table);
HCatCreateTableDesc tableDesc = HCatCreateTableDesc.create(db, table, cols).fileFormat("textfile")
.partCols(ptnCols).tblProps(tblProps ).build();
hcatClient.createTable(tableDesc);
List<String> tables = hcatClient.listTableNamesByPattern(db, "*");
assertTrue(tables.contains(table));
}
public void createTable(String db, String table) throws Exception {
List<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
cols.add(new HCatFieldSchema("userid", Type.INT, "userid"));
cols.add(new HCatFieldSchema("viewtime", Type.BIGINT, "view time"));
cols.add(new HCatFieldSchema("pageurl", Type.STRING, "page url visited"));
cols.add(new HCatFieldSchema("ip", Type.STRING, "IP Address of the User"));
// Remove this once NotificationListener is fixed and available in HCat snapshot
Map<String, String> tblProps = new HashMap<String, String>();
tblProps.put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME, "hcat." + db + "." + table);
HCatCreateTableDesc tableDesc = HCatCreateTableDesc.create(db, table, cols).fileFormat("textfile")
.tblProps(tblProps ).build();
hcatClient.createTable(tableDesc);
List<String> tables = hcatClient.listTableNamesByPattern(db, "*");
assertTrue(tables.contains(table));
}
public void dropDatabase(String db, boolean ifExists) throws Exception {
hcatClient.dropDatabase(db, ifExists, DropDBMode.CASCADE);
List<String> dbNames = hcatClient.listDatabaseNamesByPattern(db);
assertFalse(dbNames.contains(db));
}
public void dropTable(String db, String table, boolean ifExists) throws Exception {
hcatClient.dropTable(db, table, ifExists);
List<String> tables = hcatClient.listTableNamesByPattern(db, "*");
assertFalse(tables.contains(table));
}
public String getPartitionDir(String db, String table, String partitionSpec, String dbLocation) throws Exception {
String dir = dbLocation + "/" + db + "/" + table + "/"
+ partitionSpec.replaceAll(HCatURI.PARTITION_SEPARATOR, "/");
return dir;
}
public String createPartitionDir(String db, String table, String partitionSpec, String dbLocation) throws Exception {
String dir = getPartitionDir(db, table, partitionSpec, dbLocation);
FileSystem.get(hadoopConf).mkdirs(new Path(dir));
return dir;
}
public void addPartition(String db, String table, String partitionSpec, String location) throws Exception {
String[] parts = partitionSpec.split(HCatURI.PARTITION_SEPARATOR);
Map<String, String> partitions = new HashMap<String, String>();
for (String part : parts) {
String[] split = part.split("=");
partitions.put(split[0], split[1]);
}
HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(db, table, location, partitions).build();
hcatClient.addPartition(addPtn);
assertNotNull(hcatClient.getPartition(db, table, partitions));
}
public void dropPartition(String db, String table, String partitionSpec) throws Exception {
String[] parts = partitionSpec.split(HCatURI.PARTITION_SEPARATOR);
Map<String, String> partitions = new HashMap<String, String>();
for (String part : parts) {
String[] split = part.split("=");
partitions.put(split[0], split[1]);
}
hcatClient.dropPartitions(db, table, partitions, false);
}
public List<HCatPartition> getPartitions(String db, String table, String partitionSpec) throws Exception {
String[] parts = partitionSpec.split(HCatURI.PARTITION_SEPARATOR);
Map<String, String> partitions = new HashMap<String, String>();
for (String part : parts) {
String[] split = part.split("=");
partitions.put(split[0], split[1]);
}
return hcatClient.getPartitions(db, table, partitions);
}
}