blob: e599c26c52d7b76df0eec4180526743ac4048768 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;
import java.io.File;
import java.security.PrivilegedAction;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.phoenix.query.ConfigurationFactory;
import org.apache.phoenix.queryserver.server.QueryServer;
import org.apache.phoenix.util.InstanceResolver;
import org.apache.phoenix.util.ThinClientUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
public class QueryServerTestUtil {
private static final Logger LOG = LoggerFactory.getLogger(QueryServerTestUtil.class);
private final Configuration conf;
private final HBaseTestingUtility util;
private LocalHBaseCluster hbase;
private final QueryServer pqs;
private int port;
private String url;
private String principal;
private File keytab;
private ExecutorService executor;
public QueryServerTestUtil(Configuration conf) {
this.conf = Objects.requireNonNull(conf);
this.util = new HBaseTestingUtility(conf);
this.pqs = new QueryServer(new String[0], conf);
}
public QueryServerTestUtil(Configuration conf, String principal, File keytab) {
this.conf = Objects.requireNonNull(conf);
this.principal = principal;
this.keytab = keytab;
this.util = new HBaseTestingUtility(conf);
this.pqs = new QueryServer(new String[0], conf);
}
public void startLocalHBaseCluster(Class testClass) throws Exception {
startLocalHBaseCluster(testClass.getCanonicalName());
}
public void startLocalHBaseCluster(String uniqueName) throws Exception {
LOG.debug("Starting local HBase cluster for '{}'", uniqueName);
// Start ZK
util.startMiniZKCluster();
// Start HDFS
util.startMiniDFSCluster(1);
// Start HBase
Path rootdir = util.getDataTestDirOnTestFS(uniqueName);
FSUtils.setRootDir(conf, rootdir);
hbase = new LocalHBaseCluster(conf, 1);
hbase.startup();
}
public void stopLocalHBaseCluster() throws Exception {
LOG.debug("Stopping local HBase cluster");
if (hbase != null) {
hbase.shutdown();
hbase.join();
}
if (util != null) {
util.shutdownMiniDFSCluster();
util.shutdownMiniZKCluster();
}
}
public void startQueryServer() throws Exception {
setupQueryServerConfiguration(conf);
executor = Executors.newSingleThreadExecutor();
if (!Strings.isNullOrEmpty(principal) && null != keytab) {
// Get the PQS ident for PQS to use
final UserGroupInformation ugi = UserGroupInformation
.loginUserFromKeytabAndReturnUGI(principal, keytab.getAbsolutePath());
// Launch PQS, doing in the Kerberos login instead of letting PQS do it itself (which would
// break the HBase/HDFS logins also running in the same test case).
executor.submit(new Runnable() {
@Override
public void run() {
ugi.doAs(new PrivilegedAction<Void>() {
@Override
public Void run() {
pqs.run();
return null;
}
});
}
});
} else {
// Launch PQS without a login
executor.submit(new Runnable() {
@Override
public void run() {
pqs.run();
}
});
}
pqs.awaitRunning();
port = pqs.getPort();
url = ThinClientUtil.getConnectionUrl("localhost", port);
}
public void stopQueryServer() throws Exception {
if (pqs != null) {
pqs.stop();
}
if (executor != null) {
executor.shutdown();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.info("PQS didn't exit in 5 seconds, proceeding anyways.");
}
}
}
public static void setupQueryServerConfiguration(final Configuration conf) {
// Make sure the ConnectionInfo doesn't try to pull a default Configuration
InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public Configuration getConfiguration(Configuration confToClone) {
Configuration copy = new Configuration(conf);
copy.addResource(confToClone);
return copy;
}
});
}
public int getPort() {
return port;
}
public String getUrl() {
return url;
}
/**
* Returns the query server URL with the specified URL params
* @param params URL params
* @return URL with params
*/
public String getUrl(Map<String, String> params) {
if (params == null || params.size() == 0) {
return url;
}
StringBuilder urlParams = new StringBuilder();
for (Map.Entry<String, String> param : params.entrySet()) {
urlParams.append(";").append(param.getKey()).append("=").append(param.getValue());
}
return url + urlParams;
}
}