blob: 6c33f63298b1f75ddab732febb4987755760dc2a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hive.metastore;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.security.auth.login.LoginException;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.annotation.NoReconnect;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A thread safe time expired cache for HiveMetaStoreClient
*/
class HiveClientCache {
public final static int DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS = 2 * 60;
public final static int DEFAULT_HIVE_CACHE_INITIAL_CAPACITY = 50;
public final static int DEFAULT_HIVE_CACHE_MAX_CAPACITY = 50;
public final static boolean DEFAULT_HIVE_CLIENT_CACHE_STATS_ENABLED = false;
private final Cache<HiveClientCacheKey, ICacheableMetaStoreClient> hiveCache;
private static final Logger LOG = LoggerFactory.getLogger(HiveClientCache.class);
private final int timeout;
// This lock is used to make sure removalListener won't close a client that is being contemplated for returning by get()
private final Object CACHE_TEARDOWN_LOCK = new Object();
private static final AtomicInteger nextId = new AtomicInteger(0);
private final ScheduledFuture<?> cleanupHandle; // used to cleanup cache
private boolean enableStats;
// Since HiveMetaStoreClient is not threadsafe, hive clients are not shared across threads.
// Thread local variable containing each thread's unique ID, is used as one of the keys for the cache
// causing each thread to get a different client even if the conf is same.
private static final ThreadLocal<Integer> threadId =
new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return nextId.getAndIncrement();
}
};
private int getThreadId() {
return threadId.get();
}
public static IMetaStoreClient getNonCachedHiveMetastoreClient(HiveConf hiveConf) throws MetaException {
return RetryingMetaStoreClient.getProxy(hiveConf, true);
}
public HiveClientCache(HiveConf hiveConf) {
this((int) HiveConf.getTimeVar(hiveConf, HiveConf.ConfVars.METASTORE_CLIENT_CACHE_EXPIRY_TIME, TimeUnit.SECONDS),
HiveConf.getIntVar(hiveConf, HiveConf.ConfVars.METASTORE_CLIENT_CACHE_INITIAL_CAPACITY),
HiveConf.getIntVar(hiveConf, HiveConf.ConfVars.METASTORE_CLIENT_CACHE_MAX_CAPACITY),
HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.METASTORE_CLIENT_CACHE_STATS_ENABLED));
}
/**
* @deprecated This constructor will be made private or removed as more configuration properties are required.
*/
@Deprecated
public HiveClientCache(final int timeout) {
this(timeout, DEFAULT_HIVE_CACHE_INITIAL_CAPACITY, DEFAULT_HIVE_CACHE_MAX_CAPACITY, DEFAULT_HIVE_CLIENT_CACHE_STATS_ENABLED);
}
/**
* @param timeout the length of time in seconds after a client is created that it should be automatically removed
*/
private HiveClientCache(final int timeout, final int initialCapacity, final int maxCapacity, final boolean enableStats) {
this.timeout = timeout;
this.enableStats = enableStats;
LOG.info("Initializing cache: eviction-timeout=" + timeout + " initial-capacity=" + initialCapacity + " maximum-capacity=" + maxCapacity);
CacheBuilder builder = CacheBuilder.newBuilder()
.initialCapacity(initialCapacity)
.maximumSize(maxCapacity)
.expireAfterAccess(timeout, TimeUnit.SECONDS)
.removalListener(createRemovalListener());
/*
* Guava versions <12.0 have stats collection enabled by default and do not expose a recordStats method.
* Check for newer versions of the library and ensure that stats collection is enabled by default.
*/
try {
java.lang.reflect.Method m = builder.getClass().getMethod("recordStats", null);
m.invoke(builder, null);
} catch (NoSuchMethodException e) {
LOG.debug("Using a version of guava <12.0. Stats collection is enabled by default.");
} catch (Exception e) {
LOG.warn("Unable to invoke recordStats method.", e);
}
this.hiveCache = builder.build();
/*
* We need to use a cleanup interval, which is how often the cleanup thread will kick in
* and go do a check to see if any of the connections can be expired. We don't want to
* do this too often, because it'd be like having a mini-GC going off every so often,
* so we limit it to a minimum of DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS. If the client
* has explicitly set a larger timeout on the cache, though, we respect that, and use that
*/
long cleanupInterval = timeout > DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS ? timeout : DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS;
this.cleanupHandle = createCleanupThread(cleanupInterval);
createShutdownHook();
}
private RemovalListener<HiveClientCacheKey, ICacheableMetaStoreClient> createRemovalListener() {
RemovalListener<HiveClientCacheKey, ICacheableMetaStoreClient> listener =
new RemovalListener<HiveClientCacheKey, ICacheableMetaStoreClient>() {
@Override
public void onRemoval(RemovalNotification<HiveClientCacheKey, ICacheableMetaStoreClient> notification) {
ICacheableMetaStoreClient hiveMetaStoreClient = notification.getValue();
if (hiveMetaStoreClient != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Evicting client: " + Integer.toHexString(System.identityHashCode(hiveMetaStoreClient)));
}
// TODO: This global lock may not be necessary as all concurrent methods in ICacheableMetaStoreClient
// are synchronized.
synchronized (CACHE_TEARDOWN_LOCK) {
hiveMetaStoreClient.setExpiredFromCache();
hiveMetaStoreClient.tearDownIfUnused();
}
}
}
};
return listener;
}
private ScheduledFuture<?> createCleanupThread(long interval) {
// Add a maintenance thread that will attempt to trigger a cache clean continuously
Runnable cleanupThread = new Runnable() {
@Override
public void run() {
cleanup();
}
};
/**
* Create the cleanup handle. In addition to cleaning up every cleanupInterval, we add
* a slight offset, so that the very first time it runs, it runs with a slight delay, so
* as to catch any other connections that were closed when the first timeout happened.
* As a result, the time we can expect an unused connection to be reaped is
* 5 seconds after the first timeout, and then after that, it'll check for whether or not
* it can be cleaned every max(DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS,timeout) seconds
*/
ThreadFactory daemonThreadFactory = (new ThreadFactoryBuilder()).setDaemon(true)
.setNameFormat("HiveClientCache-cleaner-%d")
.build();
return Executors.newScheduledThreadPool(1, daemonThreadFactory)
.scheduleWithFixedDelay(cleanupThread, timeout + 5, interval, TimeUnit.SECONDS);
}
private void createShutdownHook() {
// Add a shutdown hook for cleanup, if there are elements remaining in the cache which were not cleaned up.
// This is the best effort approach. Ignore any error while doing so. Notice that most of the clients
// would get cleaned up via either the removalListener or the close() call, only the active clients
// that are in the cache or expired but being used in other threads wont get cleaned. The following code will only
// clean the active cache ones. The ones expired from cache but being hold by other threads are in the mercy
// of finalize() being called.
Thread cleanupHiveClientShutdownThread = new Thread() {
@Override
public void run() {
LOG.debug("Cleaning up hive client cache in ShutDown hook");
cleanupHandle.cancel(false); // Cancel the maintenance thread.
closeAllClientsQuietly();
}
};
ShutdownHookManager.addShutdownHook(cleanupHiveClientShutdownThread);
}
/**
* Note: This doesn't check if they are being used or not, meant only to be called during shutdown etc.
*/
void closeAllClientsQuietly() {
try {
ConcurrentMap<HiveClientCacheKey, ICacheableMetaStoreClient> elements = hiveCache.asMap();
for (ICacheableMetaStoreClient cacheableHiveMetaStoreClient : elements.values()) {
cacheableHiveMetaStoreClient.tearDown();
}
} catch (Exception e) {
LOG.warn("Clean up of hive clients in the cache failed. Ignored", e);
}
if (this.enableStats) {
LOG.info("Cache statistics after shutdown: size=" + hiveCache.size() + " " + hiveCache.stats());
}
}
public void cleanup() {
// TODO: periodically reload a new HiveConf to check if stats reporting is enabled.
hiveCache.cleanUp();
if (enableStats) {
LOG.info("Cache statistics after cleanup: size=" + hiveCache.size() + " " + hiveCache.stats());
}
}
/**
* Returns a cached client if exists or else creates one, caches and returns it. It also checks that the client is
* healthy and can be reused
* @param hiveConf
* @return the hive client
* @throws MetaException
* @throws IOException
* @throws LoginException
*/
public IMetaStoreClient get(final HiveConf hiveConf) throws MetaException, IOException, LoginException {
final HiveClientCacheKey cacheKey = HiveClientCacheKey.fromHiveConf(hiveConf, getThreadId());
ICacheableMetaStoreClient cacheableHiveMetaStoreClient = null;
// the hmsc is not shared across threads. So the only way it could get closed while we are doing healthcheck
// is if removalListener closes it. The synchronization takes care that removalListener won't do it
synchronized (CACHE_TEARDOWN_LOCK) {
cacheableHiveMetaStoreClient = getOrCreate(cacheKey);
cacheableHiveMetaStoreClient.acquire();
}
if (!cacheableHiveMetaStoreClient.isOpen()) {
synchronized (CACHE_TEARDOWN_LOCK) {
hiveCache.invalidate(cacheKey);
cacheableHiveMetaStoreClient.close();
cacheableHiveMetaStoreClient = getOrCreate(cacheKey);
cacheableHiveMetaStoreClient.acquire();
}
}
return cacheableHiveMetaStoreClient;
}
/**
* Return from cache if exists else create/cache and return
* @param cacheKey
* @return
* @throws IOException
* @throws MetaException
* @throws LoginException
*/
private ICacheableMetaStoreClient getOrCreate(final HiveClientCacheKey cacheKey)
throws IOException, MetaException, LoginException {
try {
return hiveCache.get(cacheKey, new Callable<ICacheableMetaStoreClient>() {
@Override
public ICacheableMetaStoreClient call() throws MetaException {
// This is called from HCat, so always allow embedded metastore (as was the default).
return
(ICacheableMetaStoreClient) RetryingMetaStoreClient.getProxy(cacheKey.getHiveConf(),
new Class<?>[]{HiveConf.class, Integer.class, Boolean.class},
new Object[]{cacheKey.getHiveConf(), timeout, true},
CacheableHiveMetaStoreClient.class.getName());
}
});
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof IOException) {
throw (IOException) t;
} else if (t instanceof MetaException) {
throw (MetaException) t;
} else if (t instanceof LoginException) {
throw (LoginException) t;
} else {
throw new IOException("Error creating hiveMetaStoreClient", t);
}
}
}
/**
* A class to wrap HiveConf and expose equality based only on UserGroupInformation and the metaStoreURIs.
* This becomes the key for the cache and this way the same HiveMetaStoreClient would be returned if
* UserGroupInformation and metaStoreURIs are same. This function can evolve to express
* the cases when HiveConf is different but the same hiveMetaStoreClient can be used
*/
static class HiveClientCacheKey {
final private String metaStoreURIs;
final private UserGroupInformation ugi;
final private HiveConf hiveConf;
final private int threadId;
private HiveClientCacheKey(HiveConf hiveConf, final int threadId) throws IOException, LoginException {
this.metaStoreURIs = hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS);
ugi = Utils.getUGI();
this.hiveConf = hiveConf;
this.threadId = threadId;
}
public static HiveClientCacheKey fromHiveConf(HiveConf hiveConf, final int threadId) throws IOException, LoginException {
return new HiveClientCacheKey(hiveConf, threadId);
}
public HiveConf getHiveConf() {
return hiveConf;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
HiveClientCacheKey that = (HiveClientCacheKey) o;
return new EqualsBuilder().
append(this.metaStoreURIs,
that.metaStoreURIs).
append(this.ugi, that.ugi).
append(this.threadId, that.threadId).isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder().
append(metaStoreURIs).
append(ugi).
append(threadId).toHashCode();
}
@Override
public String toString() {
return "HiveClientCacheKey: uri=" + this.metaStoreURIs + " ugi=" + this.ugi + " thread=" + this.threadId;
}
}
@InterfaceAudience.Private
public interface ICacheableMetaStoreClient extends IMetaStoreClient {
@NoReconnect
void acquire();
@NoReconnect
void setExpiredFromCache();
@NoReconnect
AtomicInteger getUsers();
@NoReconnect
boolean isClosed();
/**
* @deprecated This method is not used internally and should not be visible through HCatClient.create.
*/
@Deprecated
@NoReconnect
boolean isOpen();
@NoReconnect
void tearDownIfUnused();
@NoReconnect
void tearDown();
}
/**
* Add # of current users on HiveMetaStoreClient, so that the client can be cleaned when no one is using it.
*/
static class CacheableHiveMetaStoreClient extends HiveMetaStoreClient implements ICacheableMetaStoreClient {
private final AtomicInteger users = new AtomicInteger(0);
private volatile boolean expiredFromCache = false;
private boolean isClosed = false;
CacheableHiveMetaStoreClient(final HiveConf conf, final Integer timeout, Boolean allowEmbedded)
throws MetaException {
super(conf, null, allowEmbedded);
}
/**
* Increments the user count and optionally renews the expiration time.
* <code>renew</code> should correspond with the expiration policy of the cache.
* When the policy is <code>expireAfterAccess</code>, the expiration time should be extended.
* When the policy is <code>expireAfterWrite</code>, the expiration time should not be extended.
* A mismatch with the policy will lead to closing the connection unnecessarily after the initial
* expiration time is generated.
*/
public synchronized void acquire() {
users.incrementAndGet();
if (users.get() > 1) {
LOG.warn("Unexpected increment of user count beyond one: " + users.get() + " " + this);
}
}
/**
* Decrements the user count.
*/
private void release() {
if (users.get() > 0) {
users.decrementAndGet();
} else {
LOG.warn("Unexpected attempt to decrement user count of zero: " + users.get() + " " + this);
}
}
/**
* Communicate to the client that it is no longer in the cache.
* The expiration time should be voided to allow the connection to be closed at the first opportunity.
*/
public synchronized void setExpiredFromCache() {
if (users.get() != 0) {
LOG.warn("Evicted client has non-zero user count: " + users.get());
}
expiredFromCache = true;
}
public boolean isClosed() {
return isClosed;
}
/*
* Used only for Debugging or testing purposes
*/
public AtomicInteger getUsers() {
return users;
}
/**
* Make a call to hive meta store and see if the client is still usable. Some calls where the user provides
* invalid data renders the client unusable for future use (example: create a table with very long table name)
* @return
*/
@Deprecated
public boolean isOpen() {
try {
// Look for an unlikely database name and see if either MetaException or TException is thrown
super.getDatabases("NonExistentDatabaseUsedForHealthCheck");
} catch (TException e) {
return false;
}
return true;
}
/**
* Decrement the user count and piggyback this to set expiry flag as well, then teardown(), if conditions are met.
* This *MUST* be called by anyone who uses this client.
*/
@Override
public synchronized void close() {
release();
tearDownIfUnused();
}
/**
* Attempt to tear down the client connection.
* The connection will be closed if the following conditions hold:
* 1. There are no active user holding the client.
* 2. The client has been evicted from the cache.
*/
public synchronized void tearDownIfUnused() {
if (users.get() != 0) {
LOG.warn("Non-zero user count preventing client tear down: users=" + users.get() + " expired=" + expiredFromCache);
}
if (users.get() == 0 && expiredFromCache) {
this.tearDown();
}
}
/**
* Close the underlying objects irrespective of whether they are in use or not.
*/
public void tearDown() {
try {
if (!isClosed) {
super.close();
}
isClosed = true;
} catch (Exception e) {
LOG.warn("Error closing hive metastore client. Ignored.", e);
}
}
@Override
public String toString() {
return "HCatClient: thread: " + Thread.currentThread().getId() + " users=" + users.get()
+ " expired=" + expiredFromCache + " closed=" + isClosed;
}
/**
* GC is attempting to destroy the object.
* No one references this client anymore, so it can be torn down without worrying about user counts.
* @throws Throwable
*/
@Override
protected void finalize() throws Throwable {
if (users.get() != 0) {
LOG.warn("Closing client with non-zero user count: users=" + users.get() + " expired=" + expiredFromCache);
}
try {
this.tearDown();
} finally {
super.finalize();
}
}
}
}