blob: 2af557e2edf185106b0390617aeaf0ce9f717148 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.metastore;
import org.apache.commons.cli.OptionBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.ZKDeRegisterWatcher;
import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.StatsUpdateMode;
import org.apache.hadoop.hive.metastore.metrics.JvmPauseMonitor;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
import org.apache.hadoop.hive.metastore.utils.CommonCliOptions;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
import org.apache.hadoop.hive.metastore.utils.LogUtils;
import org.apache.hadoop.hive.metastore.utils.MetastoreVersionInfo;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* TODO:pc remove application logic to a separate interface.
*/
public class HiveMetaStore extends ThriftHiveMetastore {
public static final Logger LOG = LoggerFactory.getLogger(HiveMetaStore.class);
// boolean that tells if the HiveMetaStore (remote) server is being used.
// Can be used to determine if the calls to metastore api (HMSHandler) are being made with
// embedded metastore or a remote one
private static boolean isMetaStoreRemote = false;
private static ShutdownHookManager shutdownHookMgr;
/** MM write states. */
public static final char MM_WRITE_OPEN = 'o', MM_WRITE_COMMITTED = 'c', MM_WRITE_ABORTED = 'a';
static HadoopThriftAuthBridge.Server saslServer;
private static MetastoreDelegationTokenManager delegationTokenManager;
static boolean useSasl;
private static ZooKeeperHiveHelper zooKeeperHelper = null;
private static String msHost = null;
public static boolean isRenameAllowed(Database srcDB, Database destDB) {
if (!srcDB.getName().equalsIgnoreCase(destDB.getName())) {
if (ReplChangeManager.isSourceOfReplication(srcDB) || ReplChangeManager.isSourceOfReplication(destDB)) {
return false;
}
}
return true;
}
private static IHMSHandler newRetryingHMSHandler(IHMSHandler baseHandler, Configuration conf)
throws MetaException {
return newRetryingHMSHandler(baseHandler, conf, false);
}
private static IHMSHandler newRetryingHMSHandler(IHMSHandler baseHandler, Configuration conf,
boolean local) throws MetaException {
return RetryingHMSHandler.getProxy(conf, baseHandler, local);
}
/**
* Create retrying HMS handler for embedded metastore.
*
* <h1>IMPORTANT</h1>
*
* This method is called indirectly by HiveMetastoreClient and HiveMetaStoreClientPreCatalog
* using reflection. It can not be removed and its arguments can't be changed without matching
* change in HiveMetastoreClient and HiveMetaStoreClientPreCatalog.
*
* @param conf configuration to use
* @throws MetaException
*/
static Iface newRetryingHMSHandler(Configuration conf)
throws MetaException {
HMSHandler baseHandler = new HMSHandler("hive client", conf, false);
return RetryingHMSHandler.getProxy(conf, baseHandler, true);
}
/**
* Discard a current delegation token.
*
* @param tokenStrForm
* the token in string form
*/
public static void cancelDelegationToken(String tokenStrForm) throws IOException {
delegationTokenManager.cancelDelegationToken(tokenStrForm);
}
/**
* Get a new delegation token.
*
* @param renewer
* the designated renewer
*/
public static String getDelegationToken(String owner, String renewer, String remoteAddr)
throws IOException, InterruptedException {
return delegationTokenManager.getDelegationToken(owner, renewer, remoteAddr);
}
/**
* @return true if remote metastore has been created
*/
public static boolean isMetaStoreRemote() {
return isMetaStoreRemote;
}
/**
* Renew a delegation token to extend its lifetime.
*
* @param tokenStrForm
* the token in string form
*/
public static long renewDelegationToken(String tokenStrForm) throws IOException {
return delegationTokenManager.renewDelegationToken(tokenStrForm);
}
/**
* HiveMetaStore specific CLI
*
*/
public static class HiveMetastoreCli extends CommonCliOptions {
private int port;
@SuppressWarnings("static-access")
HiveMetastoreCli(Configuration configuration) {
super("hivemetastore", true);
this.port = MetastoreConf.getIntVar(configuration, ConfVars.SERVER_PORT);
// -p port
OPTIONS.addOption(OptionBuilder
.hasArg()
.withArgName("port")
.withDescription("Hive Metastore port number, default:"
+ this.port)
.create('p'));
}
@Override
public void parse(String[] args) {
super.parse(args);
// support the old syntax "hivemetastore [port]" but complain
args = commandLine.getArgs();
if (args.length > 0) {
// complain about the deprecated syntax -- but still run
System.err.println(
"This usage has been deprecated, consider using the new command "
+ "line syntax (run with -h to see usage information)");
this.port = Integer.parseInt(args[0]);
}
// notice that command line options take precedence over the
// deprecated (old style) naked args...
if (commandLine.hasOption('p')) {
this.port = Integer.parseInt(commandLine.getOptionValue('p'));
} else {
// legacy handling
String metastorePort = System.getenv("METASTORE_PORT");
if (metastorePort != null) {
this.port = Integer.parseInt(metastorePort);
}
}
}
public int getPort() {
return this.port;
}
}
/**
* @param args
*/
public static void main(String[] args) throws Throwable {
final Configuration conf = MetastoreConf.newMetastoreConf();
shutdownHookMgr = ShutdownHookManager.get();
HiveMetastoreCli cli = new HiveMetastoreCli(conf);
cli.parse(args);
final boolean isCliVerbose = cli.isVerbose();
// NOTE: It is critical to do this prior to initializing log4j, otherwise
// any log specific settings via hiveconf will be ignored
Properties hiveconf = cli.addHiveconfToSystemProperties();
// NOTE: It is critical to do this here so that log4j is reinitialized
// before any of the other core hive classes are loaded
try {
// If the log4j.configuration property hasn't already been explicitly set,
// use Hive's default log4j configuration
if (System.getProperty("log4j.configurationFile") == null) {
LogUtils.initHiveLog4j(conf);
} else {
//reconfigure log4j after settings via hiveconf are write into System Properties
LoggerContext context = (LoggerContext)LogManager.getContext(false);
context.reconfigure();
}
} catch (LogUtils.LogInitializationException e) {
LOG.warn(e.getMessage());
}
startupShutdownMessage(HiveMetaStore.class, args, LOG);
try {
String msg = "Starting hive metastore on port " + cli.port;
LOG.info(msg);
if (cli.isVerbose()) {
System.err.println(msg);
}
// set all properties specified on the command line
for (Map.Entry<Object, Object> item : hiveconf.entrySet()) {
conf.set((String) item.getKey(), (String) item.getValue());
}
//for metastore process, all metastore call should be embedded metastore call.
conf.set(ConfVars.THRIFT_URIS.getHiveName(), "");
// Add shutdown hook.
shutdownHookMgr.addShutdownHook(() -> {
String shutdownMsg = "Shutting down hive metastore.";
LOG.info(shutdownMsg);
if (isCliVerbose) {
System.err.println(shutdownMsg);
}
if (MetastoreConf.getBoolVar(conf, ConfVars.METRICS_ENABLED)) {
try {
Metrics.shutdown();
} catch (Exception e) {
LOG.error("error in Metrics deinit: " + e.getClass().getName() + " "
+ e.getMessage(), e);
}
}
// Remove from zookeeper if it's configured
try {
if (MetastoreConf.getVar(conf, ConfVars.THRIFT_SERVICE_DISCOVERY_MODE)
.equalsIgnoreCase("zookeeper")) {
zooKeeperHelper.removeServerInstanceFromZooKeeper();
}
} catch (Exception e) {
LOG.error("Error removing znode for this metastore instance from ZooKeeper.", e);
}
ThreadPool.shutdown();
}, 10);
//Start Metrics for Standalone (Remote) Mode
if (MetastoreConf.getBoolVar(conf, ConfVars.METRICS_ENABLED)) {
try {
Metrics.initialize(conf);
} catch (Exception e) {
// log exception, but ignore inability to start
LOG.error("error in Metrics init: " + e.getClass().getName() + " "
+ e.getMessage(), e);
}
}
startMetaStore(cli.getPort(), HadoopThriftAuthBridge.getBridge(), conf, true, null);
} catch (Throwable t) {
// Catch the exception, log it and rethrow it.
LOG.error("Metastore Thrift Server threw an exception...", t);
throw t;
}
}
/**
* Start Metastore based on a passed {@link HadoopThriftAuthBridge}.
*
* @param port
* @param bridge
* @throws Throwable
*/
public static void startMetaStore(int port, HadoopThriftAuthBridge bridge)
throws Throwable {
startMetaStore(port, bridge, MetastoreConf.newMetastoreConf(), false, null);
}
/**
* Start the metastore store.
* @param port
* @param bridge
* @param conf
* @throws Throwable
*/
public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
Configuration conf) throws Throwable {
startMetaStore(port, bridge, conf, false, null);
}
/**
* Start Metastore based on a passed {@link HadoopThriftAuthBridge}.
*
* @param port The port on which the Thrift server will start to serve
* @param bridge
* @param conf Configuration overrides
* @param startMetaStoreThreads Start the background threads (initiator, cleaner, statsupdater, etc.)
* @param startedBackgroundThreads If startMetaStoreThreads is true, this AtomicBoolean will be switched to true,
* when all of the background threads are scheduled. Useful for testing purposes to wait
* until the MetaStore is fully initialized.
* @throws Throwable
*/
public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
Configuration conf, boolean startMetaStoreThreads, AtomicBoolean startedBackgroundThreads) throws Throwable {
isMetaStoreRemote = true;
// Server will create new threads up to max as necessary. After an idle
// period, it will destroy threads to keep the number of threads in the
// pool to min.
long maxMessageSize = MetastoreConf.getLongVar(conf, ConfVars.SERVER_MAX_MESSAGE_SIZE);
int minWorkerThreads = MetastoreConf.getIntVar(conf, ConfVars.SERVER_MIN_THREADS);
int maxWorkerThreads = MetastoreConf.getIntVar(conf, ConfVars.SERVER_MAX_THREADS);
boolean tcpKeepAlive = MetastoreConf.getBoolVar(conf, ConfVars.TCP_KEEP_ALIVE);
boolean useCompactProtocol = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_COMPACT_PROTOCOL);
boolean useSSL = MetastoreConf.getBoolVar(conf, ConfVars.USE_SSL);
HMSHandler baseHandler = new HMSHandler("new db based metaserver", conf, false);
AuthFactory authFactory = new AuthFactory(bridge, conf, baseHandler);
useSasl = authFactory.isSASLWithKerberizedHadoop();
if (useSasl) {
// we are in secure mode. Login using keytab
String kerberosName = SecurityUtil
.getServerPrincipal(MetastoreConf.getVar(conf, ConfVars.KERBEROS_PRINCIPAL), "0.0.0.0");
String keyTabFile = MetastoreConf.getVar(conf, ConfVars.KERBEROS_KEYTAB_FILE);
UserGroupInformation.loginUserFromKeytab(kerberosName, keyTabFile);
saslServer = authFactory.getSaslServer();
delegationTokenManager = authFactory.getDelegationTokenManager();
}
TProcessor processor;
TTransportFactory transFactory = authFactory.getAuthTransFactory(useSSL, conf);
final TProtocolFactory protocolFactory;
final TProtocolFactory inputProtoFactory;
if (useCompactProtocol) {
protocolFactory = new TCompactProtocol.Factory();
inputProtoFactory = new TCompactProtocol.Factory(maxMessageSize, maxMessageSize);
} else {
protocolFactory = new TBinaryProtocol.Factory();
inputProtoFactory = new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize);
}
IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
TServerSocket serverSocket;
if (useSasl) {
processor = saslServer.wrapProcessor(
new ThriftHiveMetastore.Processor<>(handler));
LOG.info("Starting DB backed MetaStore Server in Secure Mode");
} else {
// we are in unsecure mode.
if (MetastoreConf.getBoolVar(conf, ConfVars.EXECUTE_SET_UGI)) {
processor = new TUGIBasedProcessor<>(handler);
LOG.info("Starting DB backed MetaStore Server with SetUGI enabled");
} else {
processor = new TSetIpAddressProcessor<>(handler);
LOG.info("Starting DB backed MetaStore Server");
}
}
msHost = MetastoreConf.getVar(conf, ConfVars.THRIFT_BIND_HOST);
if (msHost != null && !msHost.trim().isEmpty()) {
LOG.info("Binding host " + msHost + " for metastore server");
}
if (!useSSL) {
serverSocket = SecurityUtils.getServerSocket(msHost, port);
} else {
String keyStorePath = MetastoreConf.getVar(conf, ConfVars.SSL_KEYSTORE_PATH).trim();
if (keyStorePath.isEmpty()) {
throw new IllegalArgumentException(ConfVars.SSL_KEYSTORE_PATH.toString()
+ " Not configured for SSL connection");
}
String keyStorePassword =
MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_KEYSTORE_PASSWORD);
String keyStoreType =
MetastoreConf.getVar(conf, ConfVars.SSL_KEYSTORE_TYPE).trim();
String keyStoreAlgorithm =
MetastoreConf.getVar(conf, ConfVars.SSL_KEYMANAGERFACTORY_ALGORITHM).trim();
// enable SSL support for HMS
List<String> sslVersionBlacklist = new ArrayList<>();
for (String sslVersion : MetastoreConf.getVar(conf, ConfVars.SSL_PROTOCOL_BLACKLIST).split(",")) {
sslVersionBlacklist.add(sslVersion);
}
serverSocket = SecurityUtils.getServerSSLSocket(msHost, port, keyStorePath,
keyStorePassword, keyStoreType, keyStoreAlgorithm, sslVersionBlacklist);
}
if (tcpKeepAlive) {
serverSocket = new TServerSocketKeepAlive(serverSocket);
}
TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket)
.processor(processor)
.transportFactory(transFactory)
.protocolFactory(protocolFactory)
.inputProtocolFactory(inputProtoFactory)
.minWorkerThreads(minWorkerThreads)
.maxWorkerThreads(maxWorkerThreads);
TServer tServer = new TThreadPoolServer(args);
TServerEventHandler tServerEventHandler = new TServerEventHandler() {
@Override
public void preServe() {
}
@Override
public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
Metrics.getOpenConnectionsCounter().inc();
return null;
}
@Override
public void deleteContext(ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {
Metrics.getOpenConnectionsCounter().dec();
// If the IMetaStoreClient#close was called, HMSHandler#shutdown would have already
// cleaned up thread local RawStore. Otherwise, do it now.
HMSHandler.cleanupRawStore();
}
@Override
public void processContext(ServerContext serverContext, TTransport tTransport, TTransport tTransport1) {
}
};
tServer.setServerEventHandler(tServerEventHandler);
LOG.info("Started the new metaserver on port [" + port
+ "]...");
LOG.info("Options.minWorkerThreads = "
+ minWorkerThreads);
LOG.info("Options.maxWorkerThreads = "
+ maxWorkerThreads);
LOG.info("TCP keepalive = " + tcpKeepAlive);
LOG.info("Enable SSL = " + useSSL);
logCompactionParameters(conf);
boolean directSqlEnabled = MetastoreConf.getBoolVar(conf, ConfVars.TRY_DIRECT_SQL);
LOG.info("Direct SQL optimization = {}", directSqlEnabled);
if (startMetaStoreThreads) {
Lock metaStoreThreadsLock = new ReentrantLock();
Condition startCondition = metaStoreThreadsLock.newCondition();
AtomicBoolean startedServing = new AtomicBoolean();
startMetaStoreThreads(conf, metaStoreThreadsLock, startCondition, startedServing,
isMetastoreHousekeepingLeader(conf, getServerHostName()), startedBackgroundThreads);
signalOtherThreadsToStart(tServer, metaStoreThreadsLock, startCondition, startedServing);
}
// If dynamic service discovery through ZooKeeper is enabled, add this server to the ZooKeeper.
if (MetastoreConf.getVar(conf, ConfVars.THRIFT_SERVICE_DISCOVERY_MODE)
.equalsIgnoreCase("zookeeper")) {
try {
zooKeeperHelper = MetastoreConf.getZKConfig(conf);
String serverInstanceURI = getServerInstanceURI(port);
zooKeeperHelper.addServerInstanceToZooKeeper(serverInstanceURI, serverInstanceURI, null,
new ZKDeRegisterWatcher(zooKeeperHelper));
LOG.info("Metastore server instance with URL " + serverInstanceURI + " added to " +
"the zookeeper");
} catch (Exception e) {
LOG.error("Error adding this metastore instance to ZooKeeper: ", e);
throw e;
}
}
tServer.serve();
}
private static void logCompactionParameters(Configuration conf) {
LOG.info("Compaction HMS parameters:");
LOG.info("metastore.compactor.initiator.on = {}", MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON));
LOG.info("metastore.compactor.worker.threads = {}",
MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_WORKER_THREADS));
LOG.info("hive.metastore.runworker.in = {}", MetastoreConf.getVar(conf, ConfVars.HIVE_METASTORE_RUNWORKER_IN));
LOG.info("metastore.compactor.history.retention.attempted = {}",
MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_DID_NOT_INITIATE));
LOG.info("metastore.compactor.history.retention.failed = {}",
MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED));
LOG.info("metastore.compactor.history.retention.succeeded = {}",
MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED));
LOG.info("metastore.compactor.initiator.failed.compacts.threshold = {}",
MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD));
LOG.info("metastore.compactor.enable.stats.compression",
MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_MINOR_STATS_COMPRESSION));
if (!MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON)) {
LOG.warn("Compactor Initiator is turned Off. Automatic compaction will not be triggered.");
}
if (MetastoreConf.getVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("metastore")) {
int numThreads = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_WORKER_THREADS);
if (numThreads < 1) {
LOG.warn("Invalid number of Compactor Worker threads({}) on HMS", numThreads);
}
}
}
private static boolean isMetastoreHousekeepingLeader(Configuration conf, String serverHost) {
String leaderHost =
MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME);
// For the sake of backward compatibility, when the current HMS becomes the leader when no
// leader is specified.
if (leaderHost == null || leaderHost.isEmpty()) {
LOG.info(ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME + " is empty. Start all the " +
"housekeeping threads.");
return true;
}
LOG.info(ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME + " is set to " + leaderHost);
return leaderHost.trim().equals(serverHost);
}
/**
* @param port where metastore server is running
* @return metastore server instance URL. If the metastore server was bound to a configured
* host, return that appended by port. Otherwise return the externally visible URL of the local
* host with the given port
* @throws Exception
*/
private static String getServerInstanceURI(int port) throws Exception {
return getServerHostName() + ":" + port;
}
private static String getServerHostName() throws Exception {
if (msHost != null && !msHost.trim().isEmpty()) {
return msHost.trim();
} else {
return InetAddress.getLocalHost().getHostName();
}
}
private static void signalOtherThreadsToStart(final TServer server, final Lock startLock,
final Condition startCondition,
final AtomicBoolean startedServing) {
// A simple thread to wait until the server has started and then signal the other threads to
// begin
Thread t = new Thread() {
@Override
public void run() {
do {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.warn("Signalling thread was interrupted: " + e.getMessage());
}
} while (!server.isServing());
startLock.lock();
try {
startedServing.set(true);
startCondition.signalAll();
} finally {
startLock.unlock();
}
}
};
t.start();
}
/**
* Start threads outside of the thrift service, such as the compactor threads.
* @param conf Hive configuration object
* @param isLeader true if this metastore is a leader. Most of the housekeeping threads are
* started only in a leader HMS.
*/
private static void startMetaStoreThreads(final Configuration conf, final Lock startLock,
final Condition startCondition, final AtomicBoolean startedServing, boolean isLeader,
final AtomicBoolean startedBackGroundThreads) {
// A thread is spun up to start these other threads. That's because we can't start them
// until after the TServer has started, but once TServer.serve is called we aren't given back
// control.
Thread t = new Thread() {
@Override
public void run() {
// This is a massive hack. The compactor threads have to access packages in ql (such as
// AcidInputFormat). ql depends on metastore so we can't directly access those. To deal
// with this the compactor thread classes have been put in ql and they are instantiated here
// dyanmically. This is not ideal but it avoids a massive refactoring of Hive packages.
//
// Wrap the start of the threads in a catch Throwable loop so that any failures
// don't doom the rest of the metastore.
startLock.lock();
try {
JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor.start();
} catch (Throwable t) {
LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " +
"warned upon.", t);
}
try {
// Per the javadocs on Condition, do not depend on the condition alone as a start gate
// since spurious wake ups are possible.
while (!startedServing.get()) {
startCondition.await();
}
if (isLeader) {
startCompactorInitiator(conf);
startCompactorCleaner(conf);
startRemoteOnlyTasks(conf);
startStatsUpdater(conf);
HMSHandler.startAlwaysTaskThreads(conf);
}
// The leader HMS may not necessarily have sufficient compute capacity required to run
// actual compaction work. So it can run on a non-leader HMS with sufficient capacity
// or a configured HS2 instance.
if (MetastoreConf.getVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("metastore")) {
startCompactorWorkers(conf);
}
} catch (Throwable e) {
LOG.error("Failure when starting the compactor, compactions may not happen, " +
StringUtils.stringifyException(e));
} finally {
startLock.unlock();
}
if (isLeader) {
ReplChangeManager.scheduleCMClearer(conf);
}
if (startedBackGroundThreads != null) {
startedBackGroundThreads.set(true);
}
}
};
t.setDaemon(true);
t.setName("Metastore threads starter thread");
t.start();
}
protected static void startStatsUpdater(Configuration conf) throws Exception {
StatsUpdateMode mode = StatsUpdateMode.valueOf(
MetastoreConf.getVar(conf, ConfVars.STATS_AUTO_UPDATE).toUpperCase());
if (mode == StatsUpdateMode.NONE) {
return;
}
MetaStoreThread t = instantiateThread("org.apache.hadoop.hive.ql.stats.StatsUpdaterThread");
initializeAndStartThread(t, conf);
}
private static void startCompactorInitiator(Configuration conf) throws Exception {
if (MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON)) {
MetaStoreThread initiator =
instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Initiator");
initializeAndStartThread(initiator, conf);
LOG.info("This HMS instance will act as a Compactor Initiator.");
}
}
private static void startCompactorWorkers(Configuration conf) throws Exception {
int numWorkers = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_WORKER_THREADS);
for (int i = 0; i < numWorkers; i++) {
MetaStoreThread worker =
instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Worker");
initializeAndStartThread(worker, conf);
}
LOG.info("This HMS instance will act as a Compactor Worker with {} threads", numWorkers);
}
private static void startCompactorCleaner(Configuration conf) throws Exception {
if (MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON)) {
MetaStoreThread cleaner =
instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Cleaner");
initializeAndStartThread(cleaner, conf);
LOG.info("This HMS instance will act as a Compactor Cleaner.");
}
}
private static MetaStoreThread instantiateThread(String classname) throws Exception {
Class<?> c = Class.forName(classname);
Object o = c.newInstance();
if (MetaStoreThread.class.isAssignableFrom(o.getClass())) {
return (MetaStoreThread)o;
} else {
String s = classname + " is not an instance of MetaStoreThread.";
LOG.error(s);
throw new IOException(s);
}
}
private static int nextThreadId = 1000000;
private static void initializeAndStartThread(MetaStoreThread thread, Configuration conf) throws
Exception {
LOG.info("Starting metastore thread of type " + thread.getClass().getName());
thread.setConf(conf);
thread.setThreadId(nextThreadId++);
thread.init(new AtomicBoolean());
thread.start();
}
private static void startRemoteOnlyTasks(Configuration conf) throws Exception {
if(!MetastoreConf.getBoolVar(conf, ConfVars.METASTORE_HOUSEKEEPING_THREADS_ON)) {
return;
}
ThreadPool.initialize(conf);
Collection<String> taskNames =
MetastoreConf.getStringCollection(conf, ConfVars.TASK_THREADS_REMOTE_ONLY);
for (String taskName : taskNames) {
MetastoreTaskThread task =
JavaUtils.newInstance(JavaUtils.getClass(taskName, MetastoreTaskThread.class));
task.setConf(conf);
long freq = task.runFrequency(TimeUnit.MILLISECONDS);
LOG.info("Scheduling for " + task.getClass().getCanonicalName() + " service.");
ThreadPool.getPool().scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS);
}
}
/**
* Print a log message for starting up and shutting down.
* @param clazz the class of the server
* @param args arguments
* @param LOG the target log object
*/
private static void startupShutdownMessage(Class<?> clazz, String[] args,
final org.slf4j.Logger LOG) {
final String hostname = getHostname();
final String classname = clazz.getSimpleName();
LOG.info(
toStartupShutdownString("STARTUP_MSG: ", new String[] {
"Starting " + classname,
" host = " + hostname,
" args = " + Arrays.asList(args),
" version = " + MetastoreVersionInfo.getVersion(),
" classpath = " + System.getProperty("java.class.path"),
" build = " + MetastoreVersionInfo.getUrl() + " -r "
+ MetastoreVersionInfo.getRevision()
+ "; compiled by '" + MetastoreVersionInfo.getUser()
+ "' on " + MetastoreVersionInfo.getDate()}
)
);
shutdownHookMgr.addShutdownHook(
() -> LOG.info(toStartupShutdownString("SHUTDOWN_MSG: ", new String[]{
"Shutting down " + classname + " at " + hostname})), 0);
}
/**
* Return a message for logging.
* @param prefix prefix keyword for the message
* @param msg content of the message
* @return a message for logging
*/
private static String toStartupShutdownString(String prefix, String[] msg) {
StringBuilder b = new StringBuilder(prefix);
b.append("\n/************************************************************");
for(String s : msg) {
b.append("\n")
.append(prefix)
.append(s);
}
b.append("\n************************************************************/");
return b.toString();
}
/**
* Return hostname without throwing exception.
* @return hostname
*/
private static String getHostname() {
try {
return "" + InetAddress.getLocalHost();
} catch(UnknownHostException uhe) {
return "" + uhe;
}
}
}