blob: 6d663c375789341c8e13b089acfdcee391440ab4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.test.hive;
import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.shims.HadoopShims.MiniDFSShim;
import org.apache.hadoop.hive.shims.HadoopShims.MiniMrShim;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.service.Service;
import org.apache.hive.service.cli.CLIServiceClient;
import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient;
import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
import org.apache.hive.service.server.HiveServer2;
import com.google.common.io.Files;
// TODO: This class and AbstractHiveService are copied from the org.apache.hive.jdbc package in Hive 13.1; we can remove them once
// Hive publishes its "hive-it-unit" artifact to maven.
// TODO: We also had to make some changes to the constructor to get this to work properly; hopefully by the time we pick up MiniHS2
// from Hive itself they'll have fixed it
public class MiniHS2 extends AbstractHiveService {
private static final String driverName = "org.apache.hive.jdbc.HiveDriver";
private HiveServer2 hiveServer2 = null;
private final File baseDir;
private final Path baseDfsDir;
private static final AtomicLong hs2Counter = new AtomicLong();
private static final String HS2_BINARY_MODE = "binary";
private static final String HS2_HTTP_MODE = "http";
private MiniMrShim mr;
private MiniDFSShim dfs;
public MiniHS2(HiveConf hiveConf, FileSystem fs) throws IOException {
// MiniHS2 normally only lets you do either "local" mode or normal mode. We couldn't use "local" mode because it forks out
// a process to run a shell script (that we don't have) to run Hadoop jobs. And we didn't want to use normal mode because that
// creates Mini MR and DFS clusters, which we already have setup for Oozie. Our hacking here involved deleting the Hive Mini
// MR/DFS cluster code and passing in our jobConf in the hiveConf so that HS2 would use our Mini MR/DFS cluster.
super(hiveConf, "localhost", MetaStoreUtils.findFreePort(), MetaStoreUtils.findFreePort());
baseDir = Files.createTempDir();
baseDfsDir = new Path(new Path(fs.getUri()), "/base");
String metaStoreURL = "jdbc:derby:" + baseDir.getAbsolutePath() + File.separator + "test_metastore-" +
hs2Counter.incrementAndGet() + ";create=true";
fs.mkdirs(baseDfsDir);
Path wareHouseDir = new Path(baseDfsDir, "warehouse");
fs.mkdirs(wareHouseDir);
setWareHouseDir(wareHouseDir.toString());
System.setProperty(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, metaStoreURL);
hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, metaStoreURL);
// reassign a new port, just in case if one of the MR services grabbed the last one
setBinaryPort(MetaStoreUtils.findFreePort());
hiveConf.setVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE, HS2_BINARY_MODE);
hiveConf.setVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, getHost());
hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, getBinaryPort());
hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT, getHttpPort());
Path scratchDir = new Path(baseDfsDir, "scratch");
fs.mkdirs(scratchDir);
System.setProperty(HiveConf.ConfVars.SCRATCHDIR.varname, scratchDir.toString());
System.setProperty(HiveConf.ConfVars.LOCALSCRATCHDIR.varname,
baseDir.getPath() + File.separator + "scratch");
}
public void start(Map<String, String> confOverlay) throws Exception {
hiveServer2 = new HiveServer2();
// Set confOverlay parameters
for (Map.Entry<String, String> entry : confOverlay.entrySet()) {
setConfProperty(entry.getKey(), entry.getValue());
}
hiveServer2.init(getHiveConf());
hiveServer2.start();
waitForStartup();
setStarted(true);
}
public void stop() {
verifyStarted();
hiveServer2.stop();
setStarted(false);
try {
if (mr != null) {
mr.shutdown();
mr = null;
}
if (dfs != null) {
dfs.shutdown();
dfs = null;
}
} catch (IOException e) {
// Ignore errors cleaning up miniMR
}
FileUtils.deleteQuietly(baseDir);
}
public CLIServiceClient getServiceClient() {
verifyStarted();
return getServiceClientInternal();
}
public CLIServiceClient getServiceClientInternal() {
for (Service service : hiveServer2.getServices()) {
if (service instanceof ThriftBinaryCLIService) {
return new ThriftCLIServiceClient((ThriftBinaryCLIService) service);
}
if (service instanceof ThriftHttpCLIService) {
return new ThriftCLIServiceClient((ThriftHttpCLIService) service);
}
}
throw new IllegalStateException("HiveServer2 not running Thrift service");
}
/**
* return connection URL for this server instance
* @return
*/
public String getJdbcURL() {
return getJdbcURL("default");
}
/**
* return connection URL for this server instance
* @param dbName - DB name to be included in the URL
* @return
*/
public String getJdbcURL(String dbName) {
return getJdbcURL(dbName, "");
}
/**
* return connection URL for this server instance
* @param dbName - DB name to be included in the URL
* @param urlExtension - Addional string to be appended to URL
* @return
*/
public String getJdbcURL(String dbName, String urlExtension) {
assertNotNull("URL extension shouldn't be null", urlExtension);
String transportMode = getConfProperty(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname);
if(transportMode != null && (transportMode.equalsIgnoreCase(HS2_HTTP_MODE))) {
return "jdbc:hive2://" + getHost() + ":" + getHttpPort() + "/" + dbName;
}
else {
return "jdbc:hive2://" + getHost() + ":" + getBinaryPort() + "/" + dbName + urlExtension;
}
}
public static String getJdbcDriverName() {
return driverName;
}
public MiniMrShim getMR() {
return mr;
}
public MiniDFSShim getDFS() {
return dfs;
}
private void waitForStartup() throws Exception {
int waitTime = 0;
long startupTimeout = 1000L * 1000000000L;
CLIServiceClient hs2Client = getServiceClientInternal();
SessionHandle sessionHandle = null;
do {
Thread.sleep(500L);
waitTime += 500L;
if (waitTime > startupTimeout) {
throw new TimeoutException("Couldn't access new HiveServer2: " + getJdbcURL());
}
try {
sessionHandle = hs2Client.openSession("foo", "bar");
} catch (Exception e) {
// service not started yet
continue;
}
hs2Client.closeSession(sessionHandle);
break;
} while (true);
}
}