blob: e48b981dad64d07ea5d4b00e97ce05918bbcc3d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hive.service.server;
import com.google.common.base.Joiner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JvmPauseMonitor;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.common.ZKDeRegisterWatcher;
import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.conf.HiveServer2TransportMode;
import org.apache.hadoop.hive.conf.Validator;
import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
import org.apache.hadoop.hive.metastore.api.WMPool;
import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry;
import org.apache.hadoop.hive.ql.metadata.HiveMetaStoreClientWithLocalCache;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.events.NotificationEventPoll;
import org.apache.hadoop.hive.ql.parse.CalcitePlanner;
import org.apache.hadoop.hive.ql.parse.repl.metric.MetricSink;
import org.apache.hadoop.hive.ql.plan.mapper.StatsSources;
import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionService;
import org.apache.hadoop.hive.ql.security.authorization.HiveMetastoreAuthorizationProvider;
import org.apache.hadoop.hive.ql.security.authorization.PolicyProviderContainer;
import org.apache.hadoop.hive.ql.security.authorization.PrivilegeSynchronizer;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorThread;
import org.apache.hadoop.hive.ql.txn.compactor.Worker;
import org.apache.hadoop.hive.registry.impl.ZookeeperUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.common.util.HiveVersionInfo;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.hive.http.HttpServer;
import org.apache.hive.http.JdbcJarDownloadServlet;
import org.apache.hive.http.LlapServlet;
import org.apache.hive.http.security.PamAuthenticator;
import org.apache.hive.service.CompositeService;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.saml.HiveSaml2Client;
import org.apache.hive.service.auth.saml.HiveSamlUtils;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.session.HiveSession;
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
import org.apache.hive.service.servlet.HS2LeadershipStatus;
import org.apache.hive.service.servlet.HS2Peers;
import org.apache.hive.service.servlet.QueriesRESTfulAPIServlet;
import org.apache.hive.service.servlet.QueryProfileServlet;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* HiveServer2.
*
*/
public class HiveServer2 extends CompositeService {
private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class);
public static final String INSTANCE_URI_CONFIG = "hive.server2.instance.uri";
private static final int SHUTDOWN_TIME = 60;
private static CountDownLatch zkDeleteSignal;
private static volatile KeeperException.Code zkDeleteResultCode;
private CLIService cliService;
private ThriftCLIService thriftCLIService;
private CuratorFramework zKClientForPrivSync = null;
private HttpServer webServer; // Web UI
private TezSessionPoolManager tezSessionPoolManager;
private WorkloadManager wm;
private PamAuthenticator pamAuthenticator;
private Map<String, String> confsToPublish = new HashMap<String, String>();
private String serviceUri;
private boolean serviceDiscovery;
private boolean activePassiveHA;
private LeaderLatchListener leaderLatchListener;
private ExecutorService leaderActionsExecutorService;
private HS2ActivePassiveHARegistry hs2HARegistry;
private Hive sessionHive;
private String wmQueue;
private AtomicBoolean isLeader = new AtomicBoolean(false);
// used for testing
private SettableFuture<Boolean> isLeaderTestFuture = SettableFuture.create();
private SettableFuture<Boolean> notLeaderTestFuture = SettableFuture.create();
private ZooKeeperHiveHelper zooKeeperHelper = null;
private ScheduledQueryExecutionService scheduledQueryService;
public HiveServer2() {
super(HiveServer2.class.getSimpleName());
HiveConf.setLoadHiveServer2Config(true);
}
@VisibleForTesting
public HiveServer2(PamAuthenticator pamAuthenticator) {
super(HiveServer2.class.getSimpleName());
HiveConf.setLoadHiveServer2Config(true);
this.pamAuthenticator = pamAuthenticator;
}
@VisibleForTesting
public CLIService getCliService() {
return this.cliService;
}
@VisibleForTesting
public void setPamAuthenticator(PamAuthenticator pamAuthenticator) {
this.pamAuthenticator = pamAuthenticator;
}
@VisibleForTesting
public SettableFuture<Boolean> getIsLeaderTestFuture() {
return isLeaderTestFuture;
}
@VisibleForTesting
public SettableFuture<Boolean> getNotLeaderTestFuture() {
return notLeaderTestFuture;
}
private void resetIsLeaderTestFuture() {
isLeaderTestFuture = SettableFuture.create();
}
private void resetNotLeaderTestFuture() {
notLeaderTestFuture = SettableFuture.create();
}
@Override
public synchronized void init(HiveConf hiveConf) {
//Initialize metrics first, as some metrics are for initialization stuff.
try {
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
MetricsFactory.init(hiveConf);
}
} catch (Throwable t) {
LOG.warn("Could not initiate the HiveServer2 Metrics system. Metrics may not be reported.", t);
}
// Do not allow sessions - leader election or initialization will allow them for an active HS2.
cliService = new CLIService(this, false);
addService(cliService);
final HiveServer2 hiveServer2 = this;
boolean isHttpTransportMode = isHttpTransportMode(hiveConf);
boolean isAllTransportMode = isAllTransportMode(hiveConf);
if (isHttpTransportMode || isAllTransportMode) {
thriftCLIService = new ThriftHttpCLIService(cliService);
addService(thriftCLIService);
}
if (!isHttpTransportMode || isAllTransportMode) {
thriftCLIService = new ThriftBinaryCLIService(cliService);
addService(thriftCLIService); //thriftCliService instance is used for zookeeper purposes
}
super.init(hiveConf);
// Set host name in conf
try {
hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, getServerHost());
} catch (Throwable t) {
throw new Error("Unable to initialize HiveServer2", t);
}
if (HiveConf.getBoolVar(hiveConf, ConfVars.LLAP_HS2_ENABLE_COORDINATOR)) {
// See method comment.
try {
LlapCoordinator.initializeInstance(hiveConf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
// Trigger the creation of LLAP registry client, if in use. Clients may be using a different
// cluster than the default one, but at least for the default case we'd have it covered.
String llapHosts = HiveConf.getVar(hiveConf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
if (llapHosts != null && !llapHosts.isEmpty()) {
LlapRegistryService.getClient(hiveConf);
}
// Initialize metadata provider class and trimmer
CalcitePlanner.warmup();
try {
sessionHive = Hive.get(hiveConf);
} catch (HiveException e) {
throw new RuntimeException("Failed to get metastore connection", e);
}
// Create views registry
HiveMaterializedViewsRegistry.get().init();
StatsSources.initialize(hiveConf);
if (hiveConf.getBoolVar(ConfVars.HIVE_SCHEDULED_QUERIES_EXECUTOR_ENABLED)) {
scheduledQueryService = ScheduledQueryExecutionService.startScheduledQueryExecutorService(hiveConf);
}
// Setup cache if enabled.
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) {
try {
QueryResultsCache.initialize(hiveConf);
} catch (Exception err) {
throw new RuntimeException("Error initializing the query results cache", err);
}
}
// setup metastore client cache
if (hiveConf.getBoolVar(ConfVars.MSC_CACHE_ENABLED)) {
HiveMetaStoreClientWithLocalCache.init(hiveConf);
}
try {
NotificationEventPoll.initialize(hiveConf);
} catch (Exception err) {
throw new RuntimeException("Error initializing notification event poll", err);
}
wmQueue = hiveConf.get(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE.varname, "").trim();
this.zooKeeperAclProvider = getACLProvider(hiveConf);
this.serviceDiscovery = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY);
this.activePassiveHA = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE);
try {
if (serviceDiscovery) {
serviceUri = getServerInstanceURI();
addConfsToPublish(hiveConf, confsToPublish, serviceUri);
if (activePassiveHA) {
hiveConf.set(INSTANCE_URI_CONFIG, serviceUri);
leaderLatchListener = new HS2LeaderLatchListener(this, SessionState.get());
leaderActionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Leader Actions Handler Thread").build());
hs2HARegistry = HS2ActivePassiveHARegistry.create(hiveConf, false);
}
}
} catch (Exception e) {
throw new ServiceException(e);
}
try {
logCompactionParameters(hiveConf);
maybeStartCompactorThreads(hiveConf);
} catch (Exception e) {
throw new RuntimeException(e);
}
// Setup web UI
final int webUIPort;
final String webHost;
try {
webUIPort = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT);
webHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_BIND_HOST);
// We disable web UI in tests unless the test is explicitly setting a
// unique web ui port so that we don't mess up ptests.
boolean uiDisabledInTest = hiveConf.getBoolVar(ConfVars.HIVE_IN_TEST) &&
(webUIPort == Integer.valueOf(ConfVars.HIVE_SERVER2_WEBUI_PORT.getDefaultValue()));
if (uiDisabledInTest) {
LOG.info("Web UI is disabled in test mode since webui port was not specified");
} else {
if (webUIPort <= 0) {
LOG.info("Web UI is disabled since port is set to " + webUIPort);
} else {
LOG.info("Starting Web UI on port "+ webUIPort);
HttpServer.Builder builder = new HttpServer.Builder("hiveserver2");
builder.setPort(webUIPort).setConf(hiveConf);
builder.setHost(webHost);
builder.setMaxThreads(
hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_THREADS));
builder.setAdmins(hiveConf.getVar(ConfVars.USERS_IN_ADMIN_ROLE));
// SessionManager is initialized
builder.setContextAttribute("hive.sm",
cliService.getSessionManager());
hiveConf.set("startcode",
String.valueOf(System.currentTimeMillis()));
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_USE_SSL)) {
String keyStorePath = hiveConf.getVar(
ConfVars.HIVE_SERVER2_WEBUI_SSL_KEYSTORE_PATH);
if (StringUtils.isBlank(keyStorePath)) {
throw new IllegalArgumentException(
ConfVars.HIVE_SERVER2_WEBUI_SSL_KEYSTORE_PATH.varname
+ " Not configured for SSL connection");
}
builder.setKeyStorePassword(ShimLoader.getHadoopShims().getPassword(
hiveConf, ConfVars.HIVE_SERVER2_WEBUI_SSL_KEYSTORE_PASSWORD.varname));
builder.setKeyStorePath(keyStorePath);
builder.setKeyStoreType(hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_SSL_KEYSTORE_TYPE));
builder.setKeyManagerFactoryAlgorithm(
hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_SSL_KEYMANAGERFACTORY_ALGORITHM));
builder.setExcludeCiphersuites(
hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_SSL_EXCLUDE_CIPHERSUITES));
builder.setUseSSL(true);
}
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_USE_SPNEGO)) {
String spnegoPrincipal = hiveConf.getVar(
ConfVars.HIVE_SERVER2_WEBUI_SPNEGO_PRINCIPAL);
String spnegoKeytab = hiveConf.getVar(
ConfVars.HIVE_SERVER2_WEBUI_SPNEGO_KEYTAB);
if (StringUtils.isBlank(spnegoPrincipal) || StringUtils.isBlank(spnegoKeytab)) {
throw new IllegalArgumentException(
ConfVars.HIVE_SERVER2_WEBUI_SPNEGO_PRINCIPAL.varname
+ "/" + ConfVars.HIVE_SERVER2_WEBUI_SPNEGO_KEYTAB.varname
+ " Not configured for SPNEGO authentication");
}
builder.setSPNEGOPrincipal(spnegoPrincipal);
builder.setSPNEGOKeytab(spnegoKeytab);
builder.setUseSPNEGO(true);
}
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS)) {
builder.setEnableCORS(true);
String allowedOrigins = hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_CORS_ALLOWED_ORIGINS);
String allowedMethods = hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_CORS_ALLOWED_METHODS);
String allowedHeaders = hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_CORS_ALLOWED_HEADERS);
if (StringUtils.isBlank(allowedOrigins) || StringUtils.isBlank(allowedMethods) || StringUtils.isBlank(allowedHeaders)) {
throw new IllegalArgumentException("CORS enabled. But " +
ConfVars.HIVE_SERVER2_WEBUI_CORS_ALLOWED_ORIGINS.varname + "/" +
ConfVars.HIVE_SERVER2_WEBUI_CORS_ALLOWED_METHODS.varname + "/" +
ConfVars.HIVE_SERVER2_WEBUI_CORS_ALLOWED_HEADERS.varname + "/" +
" is not configured");
}
builder.setAllowedOrigins(allowedOrigins);
builder.setAllowedMethods(allowedMethods);
builder.setAllowedHeaders(allowedHeaders);
LOG.info("CORS enabled - allowed-origins: {} allowed-methods: {} allowed-headers: {}", allowedOrigins,
allowedMethods, allowedHeaders);
}
if(hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_XFRAME_ENABLED)){
builder.configureXFrame(true).setXFrameOption(hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_XFRAME_VALUE));
}
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_USE_PAM)) {
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_USE_SSL)) {
String hiveServer2PamServices = hiveConf.getVar(ConfVars.HIVE_SERVER2_PAM_SERVICES);
if (hiveServer2PamServices == null || hiveServer2PamServices.isEmpty()) {
throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_PAM_SERVICES.varname + " are not configured.");
}
builder.setPAMAuthenticator(pamAuthenticator == null ? new PamAuthenticator(hiveConf) : pamAuthenticator);
builder.setUsePAM(true);
} else if (hiveConf.getBoolVar(ConfVars.HIVE_IN_TEST)) {
builder.setPAMAuthenticator(pamAuthenticator == null ? new PamAuthenticator(hiveConf) : pamAuthenticator);
builder.setUsePAM(true);
} else {
throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_WEBUI_USE_SSL.varname + " has false value. It is recommended to set to true when PAM is used.");
}
}
if (serviceDiscovery && activePassiveHA) {
builder.setContextAttribute("hs2.isLeader", isLeader);
builder.setContextAttribute("hs2.failover.callback", new FailoverHandlerCallback(hs2HARegistry));
builder.setContextAttribute("hiveconf", hiveConf);
builder.addServlet("leader", HS2LeadershipStatus.class);
builder.addServlet("peers", HS2Peers.class);
}
builder.addServlet("llap", LlapServlet.class);
builder.addServlet("jdbcjar", JdbcJarDownloadServlet.class);
builder.setContextRootRewriteTarget("/hiveserver2.jsp");
webServer = builder.build();
webServer.addServlet("query_page", "/query_page.html", QueryProfileServlet.class);
webServer.addServlet("api", "/api/*", QueriesRESTfulAPIServlet.class);
}
}
} catch (IOException ie) {
throw new ServiceException(ie);
}
// Add a shutdown hook for catching SIGTERM & SIGINT
ShutdownHookManager.addShutdownHook(() -> graceful_stop());
}
private void logCompactionParameters(HiveConf hiveConf) {
LOG.info("Compaction HS2 parameters:");
String runWorkerIn = MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN);
LOG.info("hive.metastore.runworker.in = {}", runWorkerIn);
int numWorkers = MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS);
LOG.info("metastore.compactor.worker.threads = {}", numWorkers);
if ("hs2".equals(runWorkerIn) && numWorkers < 1) {
LOG.warn("Invalid number of Compactor Worker threads({}) on HS2", numWorkers);
}
}
private WMFullResourcePlan createTestResourcePlan() {
WMFullResourcePlan resourcePlan;
WMPool pool = new WMPool("testDefault", "llap");
pool.setAllocFraction(1f);
pool.setQueryParallelism(1);
resourcePlan = new WMFullResourcePlan(
new WMResourcePlan("testDefault"), Lists.newArrayList(pool));
resourcePlan.getPlan().setDefaultPoolPath("testDefault");
return resourcePlan;
}
public static boolean isHttpTransportMode(HiveConf hiveConf) {
String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
if (transportMode == null) {
transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE);
}
if (transportMode != null
&& (transportMode.equalsIgnoreCase(HiveServer2TransportMode.http.toString()))) {
return true;
}
return false;
}
public static boolean isAllTransportMode(HiveConf hiveConf) {
String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
if (transportMode == null) {
transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE);
}
if (transportMode != null && (transportMode.equalsIgnoreCase(HiveServer2TransportMode.all.toString()))) {
return true;
}
return false;
}
public static boolean isKerberosAuthMode(Configuration hiveConf) {
String authMode = hiveConf.get(ConfVars.HIVE_SERVER2_AUTHENTICATION.varname);
if (authMode != null && (authMode.equalsIgnoreCase("KERBEROS"))) {
return true;
}
return false;
}
/**
* ACLProvider for providing appropriate ACLs to CuratorFrameworkFactory
*/
private ACLProvider zooKeeperAclProvider;
private ACLProvider getACLProvider(HiveConf hiveConf) {
final boolean isSecure = ZookeeperUtils.isKerberosEnabled(hiveConf);
return new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
List<ACL> nodeAcls = new ArrayList<ACL>();
if (isSecure) {
// Read all to the world
nodeAcls.addAll(Ids.READ_ACL_UNSAFE);
// Create/Delete/Write/Admin to the authenticated user
nodeAcls.add(new ACL(Perms.ALL, Ids.AUTH_IDS));
} else {
// ACLs for znodes on a non-kerberized cluster
// Create/Read/Delete/Write/Admin to the world
nodeAcls.addAll(Ids.OPEN_ACL_UNSAFE);
}
return nodeAcls;
}
@Override
public List<ACL> getAclForPath(String path) {
return getDefaultAcl();
}
};
}
/**
* Add conf keys, values that HiveServer2 will publish to ZooKeeper.
* @param hiveConf
*/
private void addConfsToPublish(HiveConf hiveConf, Map<String, String> confsToPublish, String serviceUri) {
// Hostname
confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname,
hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST));
// Hostname:port
confsToPublish.put(INSTANCE_URI_CONFIG, serviceUri);
// Transport mode
confsToPublish.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname,
hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE));
// Transport specific confs
boolean isHttpTransportMode = isHttpTransportMode(hiveConf);
boolean isAllTransportMode = isAllTransportMode(hiveConf);
if (isHttpTransportMode || isAllTransportMode) {
confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname,
Integer.toString(hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT)));
confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname,
hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
}
if (!isHttpTransportMode || isAllTransportMode) {
confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname,
Integer.toString(hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT)));
confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname,
hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP));
}
// Auth specific confs
confsToPublish.put(ConfVars.HIVE_SERVER2_AUTHENTICATION.varname,
hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION));
if (isKerberosAuthMode(hiveConf)) {
confsToPublish.put(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname,
hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL));
}
// SSL conf
confsToPublish.put(ConfVars.HIVE_SERVER2_USE_SSL.varname,
Boolean.toString(hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)));
}
/**
* For a kerberized cluster, we dynamically set up the client's JAAS conf.
*
* @param hiveConf
* @return
* @throws Exception
*/
private static void setUpZooKeeperAuth(HiveConf hiveConf) throws Exception {
if (ZookeeperUtils.isKerberosEnabled(hiveConf)) {
String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
if (principal.isEmpty()) {
throw new IOException("HiveServer2 Kerberos principal is empty");
}
String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
if (keyTabFile.isEmpty()) {
throw new IOException("HiveServer2 Kerberos keytab is empty");
}
// Install the JAAS Configuration for the runtime
Utils.setZookeeperClientKerberosJaasConfig(principal, keyTabFile);
}
}
public boolean isLeader() {
return isLeader.get();
}
public int getOpenSessionsCount() {
return cliService != null ? cliService.getSessionManager().getOpenSessionCount() : 0;
}
interface FailoverHandler {
void failover() throws Exception;
}
public static class FailoverHandlerCallback implements FailoverHandler {
private HS2ActivePassiveHARegistry hs2HARegistry;
FailoverHandlerCallback(HS2ActivePassiveHARegistry hs2HARegistry) {
this.hs2HARegistry = hs2HARegistry;
}
@Override
public void failover() throws Exception {
hs2HARegistry.failover();
}
}
/**
* The watcher class shuts down the server if there are no more active client
* sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper.
*/
public class DeRegisterWatcher extends ZKDeRegisterWatcher {
public DeRegisterWatcher(ZooKeeperHiveHelper zooKeeperHiveHelper) {
super(zooKeeperHiveHelper);
}
@Override
public void process(WatchedEvent event) {
super.process(event);
if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
// If there are no more active client sessions, stop the server
if (cliService.getSessionManager().getOpenSessionCount() == 0) {
LOG.warn("This instance of HiveServer2 has been removed from the list of server "
+ "instances available for dynamic service discovery. "
+ "The last client session has ended - will shutdown now.");
HiveServer2.this.stop();
}
}
}
}
/**
* @return true if the server instance was deregistered from ZooKeeper, else false. The function might
* be called even when the instance is not registered with the ZooKeeper (See
* SessionManage.closeSessionInternal()). In that case, return false since the deregistration has
* not really happened.
*/
public boolean isDeregisteredWithZooKeeper() {
if (serviceDiscovery && !activePassiveHA) {
if (zooKeeperHelper != null) {
return zooKeeperHelper.isDeregisteredWithZooKeeper();
}
}
return false;
}
public String getServerInstanceURI() throws Exception {
if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
}
return thriftCLIService.getServerIPAddress().getHostName() + ":"
+ thriftCLIService.getPortNumber();
}
public String getServerHost() throws Exception {
if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
}
return thriftCLIService.getServerIPAddress().getHostName();
}
@Override
public synchronized void start() {
super.start();
// If we're supporting dynamic service discovery, we'll add the service uri for this
// HiveServer2 instance to Zookeeper as a znode.
HiveConf hiveConf = getHiveConf();
if (!serviceDiscovery || !activePassiveHA) {
allowClientSessions();
}
if (serviceDiscovery) {
try {
if (activePassiveHA) {
hs2HARegistry.registerLeaderLatchListener(leaderLatchListener, leaderActionsExecutorService);
hs2HARegistry.start();
LOG.info("HS2 HA registry started");
} else {
boolean publishConfigs =
hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_PUBLISH_CONFIGS);
String instanceURI = getServerInstanceURI();
String znodeData;
if (publishConfigs) {
// HiveServer2 configs that this instance will publish to ZooKeeper,
// so that the clients can read these and configure themselves properly.
addConfsToPublish(hiveConf, confsToPublish, getServerInstanceURI());
znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confsToPublish);
} else {
znodeData = instanceURI;
}
// Add a Znode to the specified ZooKeeper with name: serverUri=host:port;
// version=versionInfo; sequence=sequenceNumber
zooKeeperHelper = hiveConf.getZKConfig();
String znodePathPrefix = "serverUri=" + instanceURI + ";" +
"version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
zooKeeperHelper.addServerInstanceToZooKeeper(znodePathPrefix, znodeData,
zooKeeperAclProvider, new DeRegisterWatcher(zooKeeperHelper));
}
} catch (Exception e) {
LOG.error("Error adding this HiveServer2 instance to ZooKeeper: ", e);
throw new ServiceException(e);
}
}
try {
startPrivilegeSynchronizer(hiveConf);
} catch (Exception e) {
LOG.error("Error starting priviledge synchronizer: ", e);
throw new ServiceException(e);
}
if (webServer != null) {
try {
webServer.start();
LOG.info("Web UI has started on port " + webServer.getPort());
} catch (Exception e) {
LOG.error("Error starting Web UI: ", e);
throw new ServiceException(e);
}
}
if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
if (!activePassiveHA) {
LOG.info("HS2 interactive HA not enabled. Starting tez sessions..");
try {
startOrReconnectTezSessions();
} catch (Exception e) {
LOG.error("Error starting Tez sessions: ", e);
throw new ServiceException(e);
}
} else {
LOG.info("HS2 interactive HA enabled. Tez sessions will be started/reconnected by the leader.");
}
}
}
private static class HS2LeaderLatchListener implements LeaderLatchListener {
private HiveServer2 hiveServer2;
private SessionState parentSession;
HS2LeaderLatchListener(final HiveServer2 hs2, final SessionState parentSession) {
this.hiveServer2 = hs2;
this.parentSession = parentSession;
}
// leadership status change happens inside synchronized methods LeaderLatch.setLeadership().
// Also we use single threaded executor service for handling notifications which guarantees ordering for
// notification handling. if a leadership status change happens when tez sessions are getting created,
// the notLeader notification will get queued in executor service.
@Override
public void isLeader() {
LOG.info("HS2 instance {} became the LEADER. Starting/Reconnecting tez sessions..", hiveServer2.serviceUri);
hiveServer2.isLeader.set(true);
if (parentSession != null) {
SessionState.setCurrentSessionState(parentSession);
}
hiveServer2.startOrReconnectTezSessions();
LOG.info("Started/Reconnected tez sessions.");
hiveServer2.allowClientSessions();
// resolve futures used for testing
if (HiveConf.getBoolVar(hiveServer2.getHiveConf(), ConfVars.HIVE_IN_TEST)) {
hiveServer2.isLeaderTestFuture.set(true);
hiveServer2.resetNotLeaderTestFuture();
}
}
@Override
public void notLeader() {
LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri);
hiveServer2.isLeader.set(false);
hiveServer2.closeAndDisallowHiveSessions();
hiveServer2.stopOrDisconnectTezSessions();
LOG.info("Stopped/Disconnected tez sessions.");
// resolve futures used for testing
if (HiveConf.getBoolVar(hiveServer2.getHiveConf(), ConfVars.HIVE_IN_TEST)) {
hiveServer2.notLeaderTestFuture.set(true);
hiveServer2.resetIsLeaderTestFuture();
}
}
}
private void startOrReconnectTezSessions() {
LOG.info("Starting/Reconnecting tez sessions..");
// TODO: add tez session reconnect after TEZ-3875
WMFullResourcePlan resourcePlan;
try {
resourcePlan = sessionHive.getActiveResourcePlan();
} catch (HiveException e) {
if (!HiveConf.getBoolVar(getHiveConf(), ConfVars.HIVE_IN_TEST_SSL)) {
throw new RuntimeException(e);
} else {
resourcePlan = null; // Ignore errors in SSL tests where the connection is misconfigured.
}
}
if (resourcePlan == null && HiveConf.getBoolVar(
getHiveConf(), ConfVars.HIVE_IN_TEST)) {
LOG.info("Creating a default resource plan for test");
resourcePlan = createTestResourcePlan();
}
initAndStartTezSessionPoolManager(resourcePlan);
initAndStartWorkloadManager(resourcePlan);
}
private void allowClientSessions() {
cliService.getSessionManager().allowSessions(true);
}
private void initAndStartTezSessionPoolManager(final WMFullResourcePlan resourcePlan) {
// starting Tez session pool in start here to let parent session state initialize on CliService state, to avoid
// SessionState.get() return null during createTezDir
try {
// will be invoked anyway in TezTask. Doing it early to initialize triggers for non-pool tez session.
LOG.info("Initializing tez session pool manager. Active resource plan: {}",
resourcePlan == null || resourcePlan.getPlan() == null ? "null" : resourcePlan.getPlan().getName());
tezSessionPoolManager = TezSessionPoolManager.getInstance();
HiveConf hiveConf = getHiveConf();
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
tezSessionPoolManager.setupPool(hiveConf);
} else {
tezSessionPoolManager.setupNonPool(hiveConf);
}
tezSessionPoolManager.startPool(hiveConf, resourcePlan);
LOG.info("Tez session pool manager initialized.");
} catch (Exception e) {
throw new ServiceException("Unable to setup tez session pool", e);
}
}
private void initAndStartWorkloadManager(final WMFullResourcePlan resourcePlan) {
if (!StringUtils.isEmpty(wmQueue)) {
// Initialize workload management.
LOG.info("Initializing workload management. Active resource plan: {}",
resourcePlan == null || resourcePlan.getPlan() == null ? "null" : resourcePlan.getPlan().getName());
try {
wm = WorkloadManager.create(wmQueue, getHiveConf(), resourcePlan);
wm.start();
LOG.info("Workload manager initialized.");
} catch (Exception e) {
throw new ServiceException("Unable to instantiate and start Workload Manager", e);
}
} else {
LOG.info("Workload management is not enabled as {} config is not set",
ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE.varname);
}
}
private void closeAndDisallowHiveSessions() {
LOG.info("Closing all open hive sessions.");
if (cliService == null) {
return;
}
cliService.getSessionManager().allowSessions(false);
// No sessions can be opened after the above call. Close the existing ones if any.
try {
for (HiveSession session : cliService.getSessionManager().getSessions()) {
cliService.getSessionManager().closeSession(session.getSessionHandle());
}
LOG.info("Closed all open hive sessions");
} catch (HiveSQLException e) {
LOG.error("Unable to close all open sessions.", e);
}
}
private void stopOrDisconnectTezSessions() {
LOG.info("Stopping/Disconnecting tez sessions.");
// There should already be an instance of the session pool manager.
// If not, ignoring is fine while stopping HiveServer2.
if (tezSessionPoolManager != null) {
try {
tezSessionPoolManager.stop();
LOG.info("Stopped tez session pool manager.");
} catch (Exception e) {
LOG.error("Error while stopping tez session pool manager.", e);
}
}
if (wm != null) {
try {
wm.stop();
LOG.info("Stopped workload manager.");
} catch (Exception e) {
LOG.error("Error while stopping workload manager.", e);
}
}
}
/**
* Decommission HiveServer2. As a consequence, SessionManager stops
* opening new sessions, OperationManager refuses running new queries and
* HiveServer2 deregisters itself from Zookeeper if service discovery is enabled,
* but the decommissioning has no effect on the current running queries.
*/
public synchronized void decommission() {
LOG.info("Decommissioning HiveServer2");
// Remove this server instance from ZooKeeper if dynamic service discovery is set
if (zooKeeperHelper != null && !isDeregisteredWithZooKeeper()) {
try {
zooKeeperHelper.removeServerInstanceFromZooKeeper();
} catch (Exception e) {
LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e);
}
}
super.decommission();
}
public synchronized void graceful_stop() {
try {
decommission();
long maxTimeForWait = HiveConf.getTimeVar(getHiveConf(),
HiveConf.ConfVars.HIVE_SERVER2_GRACEFUL_STOP_TIMEOUT, TimeUnit.MILLISECONDS);
long timeout = maxTimeForWait, startTime = System.currentTimeMillis();
try {
// The service should be started before when reaches here, as decommissioning would throw
// IllegalStateException otherwise, so both cliService and sessionManager should not be null.
while (timeout > 0 && !getCliService().getSessionManager().getOperations().isEmpty()) {
// For gracefully stopping, sleeping some time while looping does not bring much overhead,
// that is, at most 100ms are wasted for waiting for OperationManager to be done,
// and this code path will only be executed when HS2 is being terminated.
Thread.sleep(Math.min(100, timeout));
timeout = maxTimeForWait + startTime - System.currentTimeMillis();
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for all live operations to be done");
Thread.currentThread().interrupt();
}
LOG.info("Spent {}ms waiting for live operations to be done, current live operations: {}"
, System.currentTimeMillis() - startTime
, getCliService().getSessionManager().getOperations().size());
} finally {
stop();
}
}
@Override
public synchronized void stop() {
LOG.info("Shutting down HiveServer2");
HiveConf hiveConf = this.getHiveConf();
super.stop();
if (scheduledQueryService != null) {
try {
scheduledQueryService.close();
} catch (Exception e) {
LOG.error("Error stopping schq", e);
}
}
//Shutdown metric collection
MetricSink.getInstance().tearDown();
if (hs2HARegistry != null) {
hs2HARegistry.stop();
shutdownExecutor(leaderActionsExecutorService);
LOG.info("HS2 HA registry stopped");
hs2HARegistry = null;
}
if (webServer != null) {
try {
webServer.stop();
LOG.info("Web UI has stopped");
} catch (Exception e) {
LOG.error("Error stopping Web UI: ", e);
}
}
// Shutdown Metrics
if (MetricsFactory.getInstance() != null) {
try {
MetricsFactory.close();
} catch (Exception e) {
LOG.error("error in Metrics deinit: " + e.getClass().getName() + " "
+ e.getMessage(), e);
}
}
// Remove this server instance from ZooKeeper if dynamic service discovery is set
if (serviceDiscovery && !activePassiveHA) {
try {
if (zooKeeperHelper != null) {
zooKeeperHelper.removeServerInstanceFromZooKeeper();
}
} catch (Exception e) {
LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e);
}
}
stopOrDisconnectTezSessions();
if (zKClientForPrivSync != null) {
zKClientForPrivSync.close();
}
if (hiveConf != null && HiveSamlUtils
.isSamlAuthMode(hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION))) {
// this is mostly for testing purposes to make sure that SAML client is
// reinitialized after a HS2 is restarted.
HiveSaml2Client.shutdown();
}
}
private void shutdownExecutor(final ExecutorService leaderActionsExecutorService) {
leaderActionsExecutorService.shutdown();
try {
if (!leaderActionsExecutorService.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) {
LOG.warn("Executor service did not terminate in the specified time {} sec", SHUTDOWN_TIME);
List<Runnable> droppedTasks = leaderActionsExecutorService.shutdownNow();
LOG.warn("Executor service was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed.");
}
} catch (InterruptedException e) {
LOG.warn("Executor service did not terminate in the specified time {} sec. Exception: {}", SHUTDOWN_TIME,
e.getMessage());
List<Runnable> droppedTasks = leaderActionsExecutorService.shutdownNow();
LOG.warn("Executor service was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed.");
}
}
@VisibleForTesting
public static void scheduleClearDanglingScratchDir(HiveConf hiveConf, int initialWaitInSec) {
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR)) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder()
.namingPattern("cleardanglingscratchdir-%d")
.daemon(true)
.build());
executor.scheduleAtFixedRate(new ClearDanglingScratchDir(false, false, false,
HiveConf.getVar(hiveConf, HiveConf.ConfVars.SCRATCHDIR), hiveConf), initialWaitInSec,
HiveConf.getTimeVar(hiveConf, ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR_INTERVAL,
TimeUnit.SECONDS), TimeUnit.SECONDS);
}
}
public void startPrivilegeSynchronizer(HiveConf hiveConf) throws Exception {
if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_PRIVILEGE_SYNCHRONIZER)) {
return;
}
PolicyProviderContainer policyContainer = new PolicyProviderContainer();
HiveAuthorizer authorizer = SessionState.get().getAuthorizerV2();
if (authorizer.getHivePolicyProvider() != null) {
policyContainer.addAuthorizer(authorizer);
}
if (MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.PRE_EVENT_LISTENERS) != null &&
MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.PRE_EVENT_LISTENERS).contains(
"org.apache.hadoop.hive.ql.security.authorization.AuthorizationPreEventListener") &&
MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.HIVE_AUTHORIZATION_MANAGER)!= null) {
List<HiveMetastoreAuthorizationProvider> providers = HiveUtils.getMetaStoreAuthorizeProviderManagers(
hiveConf, HiveConf.ConfVars.HIVE_METASTORE_AUTHORIZATION_MANAGER, SessionState.get().getAuthenticator());
for (HiveMetastoreAuthorizationProvider provider : providers) {
if (provider.getHivePolicyProvider() != null) {
policyContainer.addAuthorizationProvider(provider);
}
}
}
if (policyContainer.size() > 0) {
setUpZooKeeperAuth(hiveConf);
zKClientForPrivSync = hiveConf.getZKConfig().startZookeeperClient(zooKeeperAclProvider, true);
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
String path = ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "leader";
LeaderLatch privilegeSynchronizerLatch = new LeaderLatch(zKClientForPrivSync, path);
privilegeSynchronizerLatch.start();
LOG.info("Find " + policyContainer.size() + " policy to synchronize, start PrivilegeSynchronizer");
Thread privilegeSynchronizerThread = new Thread(
new PrivilegeSynchronizer(privilegeSynchronizerLatch, policyContainer, hiveConf), "PrivilegeSynchronizer");
privilegeSynchronizerThread.setDaemon(true);
privilegeSynchronizerThread.start();
} else {
LOG.warn(
"No policy provider found, skip creating PrivilegeSynchronizer");
}
}
private static void startHiveServer2() throws Throwable {
long attempts = 0, maxAttempts = 1;
while (true) {
LOG.info("Starting HiveServer2");
HiveConf hiveConf = new HiveConf();
maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
long retrySleepIntervalMs = hiveConf
.getTimeVar(ConfVars.HIVE_SERVER2_SLEEP_INTERVAL_BETWEEN_START_ATTEMPTS,
TimeUnit.MILLISECONDS);
HiveServer2 server = null;
try {
// Cleanup the scratch dir before starting
ServerUtils.cleanUpScratchDir(hiveConf);
// Schedule task to cleanup dangling scratch dir periodically,
// initial wait for a random time between 0-10 min to
// avoid intial spike when using multiple HS2
scheduleClearDanglingScratchDir(hiveConf, new Random().nextInt(600));
server = new HiveServer2();
server.init(hiveConf);
server.start();
try {
JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(hiveConf);
pauseMonitor.start();
} catch (Throwable t) {
LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " +
"warned upon.", t);
}
break;
} catch (Throwable throwable) {
if (server != null) {
try {
server.stop();
} catch (Throwable t) {
LOG.info("Exception caught when calling stop of HiveServer2 before retrying start", t);
} finally {
server = null;
}
}
if (++attempts >= maxAttempts) {
throw new Error("Max start attempts " + maxAttempts + " exhausted", throwable);
} else {
LOG.warn("Error starting HiveServer2 on attempt " + attempts
+ ", will retry in " + retrySleepIntervalMs + "ms", throwable);
try {
Thread.sleep(retrySleepIntervalMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
private void maybeStartCompactorThreads(HiveConf hiveConf) throws Exception {
if (MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("hs2")) {
int numWorkers = MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS);
for (int i = 0; i < numWorkers; i++) {
Worker w = new Worker();
CompactorThread.initializeAndStartThread(w, hiveConf);
}
LOG.info("This HS2 instance will act as Compactor Worker with {} threads", numWorkers);
}
}
/**
* Remove all znodes corresponding to the given version number from ZooKeeper
*
* @param versionNumber
* @throws Exception
*/
static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exception {
HiveConf hiveConf = new HiveConf();
setUpZooKeeperAuth(hiveConf);
CuratorFramework zooKeeperClient = hiveConf.getZKConfig().getNewZookeeperClient();
zooKeeperClient.start();
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
List<String> znodePaths =
zooKeeperClient.getChildren().forPath(
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
List<String> znodePathsUpdated;
// Now for each path that is for the given versionNumber, delete the znode from ZooKeeper
for (int i = 0; i < znodePaths.size(); i++) {
String znodePath = znodePaths.get(i);
zkDeleteSignal = new CountDownLatch(1);
if (znodePath.contains("version=" + versionNumber + ";")) {
String fullZnodePath =
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath;
LOG.warn("Will attempt to remove the znode: " + fullZnodePath + " from ZooKeeper");
System.out.println("Will attempt to remove the znode: " + fullZnodePath + " from ZooKeeper");
zooKeeperClient.delete().guaranteed().inBackground(new DeleteCallBack())
.forPath(fullZnodePath);
// Wait for the delete to complete
zkDeleteSignal.await();
final KeeperException.Code rc = HiveServer2.zkDeleteResultCode;
if (rc != KeeperException.Code.OK) {
throw KeeperException.create(rc);
}
// Get the updated path list
znodePathsUpdated =
zooKeeperClient.getChildren().forPath(
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
// Gives a list of any new paths that may have been created to maintain the persistent ephemeral node
znodePathsUpdated.removeAll(znodePaths);
// Add the new paths to the znodes list. We'll try for their removal as well.
znodePaths.addAll(znodePathsUpdated);
}
}
zooKeeperClient.close();
}
private static class DeleteCallBack implements BackgroundCallback {
@Override
public void processResult(CuratorFramework zooKeeperClient, CuratorEvent event)
throws Exception {
if (event.getType() == CuratorEventType.DELETE) {
zkDeleteResultCode = KeeperException.Code.get(event.getResultCode());
zkDeleteSignal.countDown();
}
}
}
public static void main(String[] args) {
HiveConf.setLoadHiveServer2Config(true);
try {
ServerOptionsProcessor oproc = new ServerOptionsProcessor("hiveserver2");
ServerOptionsProcessorResponse oprocResponse = oproc.parse(args);
// NOTE: It is critical to do this here so that log4j is reinitialized
// before any of the other core hive classes are loaded
String initLog4jMessage = LogUtils.initHiveLog4j();
LOG.debug(initLog4jMessage);
HiveStringUtils.startupShutdownMessage(HiveServer2.class, args, LOG);
// Logger debug message from "oproc" after log4j initialize properly
LOG.debug(oproc.getDebugMessage().toString());
// Call the executor which will execute the appropriate command based on the parsed options
oprocResponse.getServerOptionsExecutor().execute();
} catch (LogInitializationException e) {
LOG.error("Error initializing log: " + e.getMessage(), e);
System.exit(-1);
}
}
/**
* ServerOptionsProcessor.
* Process arguments given to HiveServer2 (-hiveconf property=value)
* Set properties in System properties
* Create an appropriate response object,
* which has executor to execute the appropriate command based on the parsed options.
*/
static class ServerOptionsProcessor {
private final Options options = new Options();
private org.apache.commons.cli.CommandLine commandLine;
private final String serverName;
private final StringBuilder debugMessage = new StringBuilder();
@SuppressWarnings("static-access")
ServerOptionsProcessor(String serverName) {
this.serverName = serverName;
// -hiveconf x=y
options.addOption(OptionBuilder
.withValueSeparator()
.hasArgs(2)
.withArgName("property=value")
.withLongOpt("hiveconf")
.withDescription("Use value for given property")
.create());
// -deregister <versionNumber>
options.addOption(OptionBuilder
.hasArgs(1)
.withArgName("versionNumber")
.withLongOpt("deregister")
.withDescription("Deregister all instances of given version from dynamic service discovery")
.create());
// --listHAPeers
options.addOption(OptionBuilder
.hasArgs(0)
.withLongOpt("listHAPeers")
.withDescription("List all HS2 instances when running in Active Passive HA mode")
.create());
// --failover <workerIdentity>
options.addOption(OptionBuilder
.hasArgs(1)
.withArgName("workerIdentity")
.withLongOpt("failover")
.withDescription("Manually failover Active HS2 instance to passive standby mode")
.create());
// --graceful_stop
options.addOption(OptionBuilder
.hasArgs(1)
.isRequired(false)
.withArgName("pid")
.withLongOpt("graceful_stop")
.withDescription("Gracefully stopping HS2 instance of" +
" 'pid'(default: $HIVE_CONF_DIR/hiveserver2.pid) in 'timeout'(default:1800) seconds.")
.create());
// --getHiveConf <key>
options.addOption(OptionBuilder
.hasArg(true)
.withArgName("key")
.withLongOpt("getHiveConf")
.withDescription("Get the value of key from HiveConf")
.create());
options.addOption(new Option("H", "help", false, "Print help information"));
}
ServerOptionsProcessorResponse parse(String[] argv) {
try {
commandLine = new GnuParser().parse(options, argv);
// Process --hiveconf
// Get hiveconf param values and set the System property values
Properties confProps = commandLine.getOptionProperties("hiveconf");
for (String propKey : confProps.stringPropertyNames()) {
// save logging message for log4j output latter after log4j initialize properly
debugMessage.append("Setting " + propKey + "=" + confProps.getProperty(propKey) + ";\n");
if ("hive.log.file".equals(propKey) ||
"hive.log.dir".equals(propKey) ||
"hive.root.logger".equals(propKey)) {
throw new IllegalArgumentException("Logs will be split in two "
+ "files if the commandline argument " + propKey + " is "
+ "used. To prevent this use to HADOOP_CLIENT_OPTS -D"
+ propKey + "=" + confProps.getProperty(propKey)
+ " or use the set the value in the configuration file"
+ " (see HIVE-19886)");
}
HiveConf.overrides.put(propKey, confProps.getProperty(propKey));
}
// Process --help
if (commandLine.hasOption('H')) {
return new ServerOptionsProcessorResponse(new HelpOptionExecutor(serverName, options));
}
// Process --deregister
if (commandLine.hasOption("deregister")) {
return new ServerOptionsProcessorResponse(new DeregisterOptionExecutor(
commandLine.getOptionValue("deregister")));
}
// Process --listHAPeers
if (commandLine.hasOption("listHAPeers")) {
return new ServerOptionsProcessorResponse(new ListHAPeersExecutor());
}
// Process --failover
if (commandLine.hasOption("failover")) {
return new ServerOptionsProcessorResponse(new FailoverHS2InstanceExecutor(
commandLine.getOptionValue("failover")
));
}
// Process --getHiveConf
if (commandLine.hasOption("getHiveConf")) {
return new ServerOptionsProcessorResponse(() -> {
String key = commandLine.getOptionValue("getHiveConf");
HiveConf hiveConf = new HiveConf();
HiveConf.ConfVars confVars = HiveConf.getConfVars(key);
String value = hiveConf.get(key);
if (confVars != null && confVars.getValidator() instanceof Validator.TimeValidator) {
Validator.TimeValidator validator = (Validator.TimeValidator) confVars.getValidator();
value = HiveConf.getTimeVar(hiveConf, confVars, validator.getTimeUnit()) + "";
}
System.out.println(key + "=" + value);
});
}
} catch (ParseException e) {
// Error out & exit - we were not able to parse the args successfully
System.err.println("Error starting HiveServer2 with given arguments: ");
System.err.println(e.getMessage());
System.exit(-1);
}
// Default executor, when no option is specified
return new ServerOptionsProcessorResponse(new StartOptionExecutor());
}
StringBuilder getDebugMessage() {
return debugMessage;
}
}
/**
* The response sent back from {@link ServerOptionsProcessor#parse(String[])}
*/
static class ServerOptionsProcessorResponse {
private final ServerOptionsExecutor serverOptionsExecutor;
ServerOptionsProcessorResponse(ServerOptionsExecutor serverOptionsExecutor) {
this.serverOptionsExecutor = serverOptionsExecutor;
}
ServerOptionsExecutor getServerOptionsExecutor() {
return serverOptionsExecutor;
}
}
/**
* The executor interface for running the appropriate HiveServer2 command based on parsed options
*/
static interface ServerOptionsExecutor {
public void execute();
}
/**
* HelpOptionExecutor: executes the --help option by printing out the usage
*/
static class HelpOptionExecutor implements ServerOptionsExecutor {
private final Options options;
private final String serverName;
HelpOptionExecutor(String serverName, Options options) {
this.options = options;
this.serverName = serverName;
}
@Override
public void execute() {
new HelpFormatter().printHelp(serverName, options);
System.exit(0);
}
}
/**
* StartOptionExecutor: starts HiveServer2.
* This is the default executor, when no option is specified.
*/
static class StartOptionExecutor implements ServerOptionsExecutor {
@Override
public void execute() {
try {
startHiveServer2();
} catch (Throwable t) {
LOG.error("Error starting HiveServer2", t);
System.exit(-1);
}
}
}
/**
* DeregisterOptionExecutor: executes the --deregister option by deregistering all HiveServer2
* instances from ZooKeeper of a specific version.
*/
static class DeregisterOptionExecutor implements ServerOptionsExecutor {
private final String versionNumber;
DeregisterOptionExecutor(String versionNumber) {
this.versionNumber = versionNumber;
}
@Override
public void execute() {
try {
deleteServerInstancesFromZooKeeper(versionNumber);
} catch (Exception e) {
LOG.error("Error deregistering HiveServer2 instances for version: " + versionNumber
+ " from ZooKeeper", e);
System.out.println("Error deregistering HiveServer2 instances for version: " + versionNumber
+ " from ZooKeeper." + e);
System.exit(-1);
}
System.exit(0);
}
}
/**
* Handler for --failover <workerIdentity> command. The way failover works is,
* - the client gets <workerIdentity> from user input
* - the client uses HS2 HA registry to get list of HS2 instances and finds the one that matches <workerIdentity>
* - if there is a match, client makes sure the instance is a leader (only leader can failover)
* - if the matched instance is a leader, its web endpoint is obtained from service record then http DELETE method
* is invoked on /leader endpoint (Yes. Manual failover requires web UI to be enabled)
* - the webpoint checks if admin ACLs are set, if so will close and restart the leader latch triggering a failover
*/
static class FailoverHS2InstanceExecutor implements ServerOptionsExecutor {
private final String workerIdentity;
FailoverHS2InstanceExecutor(String workerIdentity) {
this.workerIdentity = workerIdentity;
}
@Override
public void execute() {
try {
HiveConf hiveConf = new HiveConf();
HS2ActivePassiveHARegistry haRegistry = HS2ActivePassiveHARegistryClient.getClient(hiveConf);
Collection<HiveServer2Instance> hs2Instances = haRegistry.getAll();
// no HS2 instances are running
if (hs2Instances.isEmpty()) {
LOG.error("No HiveServer2 instances are running in HA mode");
System.err.println("No HiveServer2 instances are running in HA mode");
System.exit(-1);
}
HiveServer2Instance targetInstance = null;
for (HiveServer2Instance instance : hs2Instances) {
if (instance.getWorkerIdentity().equals(workerIdentity)) {
targetInstance = instance;
break;
}
}
// no match for workerIdentity
if (targetInstance == null) {
LOG.error("Cannot find any HiveServer2 instance with workerIdentity: " + workerIdentity);
System.err.println("Cannot find any HiveServer2 instance with workerIdentity: " + workerIdentity);
System.exit(-1);
}
// only one HS2 instance available (cannot failover)
if (hs2Instances.size() == 1) {
LOG.error("Only one HiveServer2 instance running in thefail cluster. Cannot failover: " + workerIdentity);
System.err.println("Only one HiveServer2 instance running in the cluster. Cannot failover: " + workerIdentity);
System.exit(-1);
}
// matched HS2 instance is not leader
if (!targetInstance.isLeader()) {
LOG.error("HiveServer2 instance (workerIdentity: " + workerIdentity + ") is not a leader. Cannot failover");
System.err.println("HiveServer2 instance (workerIdentity: " + workerIdentity + ") is not a leader. Cannot failover");
System.exit(-1);
}
String webPort = targetInstance.getProperties().get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname);
// web port cannot be obtained
if (StringUtils.isEmpty(webPort)) {
LOG.error("Unable to determine web port for instance: " + workerIdentity);
System.err.println("Unable to determine web port for instance: " + workerIdentity);
System.exit(-1);
}
// invoke DELETE /leader endpoint for failover
String webEndpoint = "http://" + targetInstance.getHost() + ":" + webPort + "/leader";
HttpDelete httpDelete = new HttpDelete(webEndpoint);
CloseableHttpResponse httpResponse = null;
try (
CloseableHttpClient client = HttpClients.createDefault();
) {
int statusCode = -1;
String response = "Response unavailable";
httpResponse = client.execute(httpDelete);
if (httpResponse != null) {
StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine != null) {
response = httpResponse.getStatusLine().getReasonPhrase();
statusCode = httpResponse.getStatusLine().getStatusCode();
if (statusCode == 200) {
System.out.println(EntityUtils.toString(httpResponse.getEntity()));
}
}
}
if (statusCode != 200) {
// Failover didn't succeed - log error and exit
LOG.error("Unable to failover HiveServer2 instance: " + workerIdentity +
". status code: " + statusCode + "error: " + response);
System.err.println("Unable to failover HiveServer2 instance: " + workerIdentity +
". status code: " + statusCode + " error: " + response);
System.exit(-1);
}
} finally {
if (httpResponse != null) {
httpResponse.close();
}
}
} catch (IOException e) {
LOG.error("Error listing HiveServer2 HA instances from ZooKeeper", e);
System.err.println("Error listing HiveServer2 HA instances from ZooKeeper" + e);
System.exit(-1);
}
System.exit(0);
}
}
static class ListHAPeersExecutor implements ServerOptionsExecutor {
@Override
public void execute() {
try {
HiveConf hiveConf = new HiveConf();
HS2ActivePassiveHARegistry haRegistry = HS2ActivePassiveHARegistryClient.getClient(hiveConf);
HS2Peers.HS2Instances hs2Instances = new HS2Peers.HS2Instances(haRegistry.getAll());
String jsonOut = hs2Instances.toJson();
System.out.println(jsonOut);
} catch (IOException e) {
LOG.error("Error listing HiveServer2 HA instances from ZooKeeper", e);
System.err.println("Error listing HiveServer2 HA instances from ZooKeeper" + e);
System.exit(-1);
}
System.exit(0);
}
}
}