blob: 19bb7605cf6967134e66ab96462da830ab6bc2d4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive.minicluster;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.Service;
import org.apache.hive.service.server.HiveServer2;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.security.PrivilegedAction;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class HiveMiniCluster {
private static final Log LOG = LogFactory.getLog(HiveMiniCluster.class.getName());
private static final String DEFAULT_HOST = "127.0.0.1";
private static final int DEFAULT_PORT = 10000;
private final String hostName;
private final int port;
private final String tempFolderPath;
private final AuthenticationConfiguration authenticationConfiguration;
private final HiveServer2 hiveServer2;
private HiveConf config;
public HiveMiniCluster(AuthenticationConfiguration authenticationConfiguration) {
this(DEFAULT_HOST, DEFAULT_PORT, authenticationConfiguration);
}
public HiveMiniCluster(String hostname, int port, AuthenticationConfiguration authenticationConfiguration) {
this(hostname, port, Files.createTempDir().getAbsolutePath(), authenticationConfiguration);
}
public HiveMiniCluster(String hostname, int port, String tempFolderPath, AuthenticationConfiguration authenticationConfiguration) {
this.hostName = hostname;
this.port = port;
this.tempFolderPath = tempFolderPath;
this.authenticationConfiguration = authenticationConfiguration;
this.hiveServer2 = new HiveServer2();
}
private void createHiveConf() {
config = new HiveConf();
config.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, tempFolderPath);
config.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, getHostName());
config.setInt(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, getPort());
config.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, getMetastoreConnectUrl());
for (Map.Entry<String, String> authConfig : authenticationConfiguration.getAuthenticationConfig().entrySet()) {
config.set(authConfig.getKey(), authConfig.getValue());
}
}
public void start() {
try {
authenticationConfiguration.init();
createHiveConf();
createHiveSiteXml();
startHiveServer();
waitForStartUp();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void createHiveSiteXml() throws IOException {
File hiveSiteXmlFile = new File(tempFolderPath,"hive-site.xml");
try (OutputStream out = new FileOutputStream(hiveSiteXmlFile)) {
config.writeXml(out);
}
HiveConf.setHiveSiteLocation(hiveSiteXmlFile.toURI().toURL());
}
private void startHiveServer() throws Exception {
authenticationConfiguration.doAsAuthenticated(new PrivilegedAction<Void>() {
@Override
public Void run() {
hiveServer2.init(config);
hiveServer2.start();
return null;
}
});
}
public void stop() {
hiveServer2.stop();
HiveConf.setHiveSiteLocation(null);
try {
FileUtils.deleteDirectory(new File(tempFolderPath));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public HiveConf getConfig() {
return config;
}
public int getPort() {
return port;
}
public String getHostName() {
return hostName;
}
public String getUrl() {
return String.format("jdbc:hive2://%s:%d/default%s", hostName, port, authenticationConfiguration.getUrlParams());
}
public String getTempFolderPath() {
return tempFolderPath;
}
public String getMetastoreConnectUrl() {
return String.format("jdbc:derby:;databaseName=%s/minicluster_metastore_db;create=true", tempFolderPath);
}
public boolean isStarted() {
return hiveServer2.getServiceState() == Service.STATE.STARTED;
}
private void waitForStartUp() throws InterruptedException, TimeoutException {
final int numberOfAttempts = 500;
final long sleepTime = 100;
for (int i = 0; i < numberOfAttempts; ++i) {
try {
LOG.debug("Attempt " + (i + 1) + " to access " + hostName + ":" + port);
new Socket(InetAddress.getByName(hostName), port).close();
return;
} catch (RuntimeException | IOException e) {
LOG.debug("Failed to connect to " + hostName + ":" + port, e);
}
Thread.sleep(sleepTime);
}
throw new RuntimeException("Couldn't access new server: " + hostName + ":" + port);
}
}