blob: 9053c5661ba2fd1ae3bef0e37be80e84ac85ff27 [file] [log] [blame]
// Copyright 2012 Cloudera Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.apache.tajo.catalog.store;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.*;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.log4j.Logger;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Manages a pool of HiveMetaStoreClient connections. If the connection pool is empty
* a new client is created and added to the pool. There is no size limit.
*/
public class HiveCatalogStoreClientPool {
private static final Logger LOG = Logger.getLogger(HiveCatalogStoreClientPool.class);
private final ConcurrentLinkedQueue<HiveCatalogStoreClient> clientPool =
new ConcurrentLinkedQueue<HiveCatalogStoreClient>();
private AtomicBoolean poolClosed = new AtomicBoolean(false);
private HiveConf hiveConf;
/**
* A wrapper around the HiveMetaStoreClient that manages interactions with the
* connection pool.
*/
public class HiveCatalogStoreClient {
private final IMetaStoreClient hiveClient;
public AtomicBoolean isInUse = new AtomicBoolean(false);
private HiveCatalogStoreClient(HiveConf hiveConf) {
try {
HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() {
@Override
public HiveMetaHook getHook(Table table) throws MetaException {
/* metadata hook implementation, or null if this
* storage handler does not need any metadata notifications
*/
return null;
}
};
this.hiveClient = RetryingMetaStoreClient.getProxy(hiveConf, hookLoader, HiveMetaStoreClient.class.getName());
clientPool.add(this);
LOG.info("MetaStoreClient created (size = " + clientPool.size() + ")");
} catch (Exception e) {
// Turn in to an unchecked exception
throw new IllegalStateException(e);
}
}
/**
* Returns the internal HiveMetaStoreClient object.
*/
public IMetaStoreClient getHiveClient() {
return hiveClient;
}
/**
* Returns this client back to the connection pool. If the connection pool has been
* closed, just close the Hive client connection.
*/
public synchronized void release() {
if(!this.isInUse.getAndSet(false)){
return;
}
// Ensure the connection isn't returned to the pool if the pool has been closed.
// This lock is needed to ensure proper behavior when a thread reads poolClosed
// is false, but a call to pool.close() comes in immediately afterward.
if (poolClosed.get()) {
this.getHiveClient().close();
} else {
clientPool.add(this);
}
}
// Marks this client as in use
private void markInUse() {
isInUse.set(true);
}
}
public HiveCatalogStoreClientPool(int initialSize) {
this(initialSize, new HiveConf(HiveCatalogStoreClientPool.class));
}
public HiveCatalogStoreClientPool(int initialSize, HiveConf hiveConf) {
this.hiveConf = hiveConf;
addClients(initialSize);
}
public HiveCatalogStoreClientPool(int initialSize, Configuration conf) {
this.hiveConf = new HiveConf();
setParameters(conf);
addClients(initialSize);
}
public void setParameters(Configuration conf) {
for( Iterator<Entry<String, String>> iter = conf.iterator(); iter.hasNext();) {
Map.Entry<String, String> entry = iter.next();
this.hiveConf.set(entry.getKey(), entry.getValue());
}
}
/**
* Add numClients to the client pool.
*/
public void addClients(int numClients) {
for (int i = 0; i < numClients; ++i) {
clientPool.add(new HiveCatalogStoreClient(hiveConf));
}
}
/**
* Gets a client from the pool. If the pool is empty a new client is created.
*/
public synchronized HiveCatalogStoreClient getClient() {
// The MetaStoreClient c'tor relies on knowing the Hadoop version by asking
// org.apache.hadoop.util.VersionInfo. The VersionInfo class relies on opening
// the 'common-version-info.properties' file as a resource from hadoop-common*.jar
// using the Thread's context classloader. If necessary, set the Thread's context
// classloader, otherwise VersionInfo will fail in it's c'tor.
if (Thread.currentThread().getContextClassLoader() == null) {
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
}
HiveCatalogStoreClient client = clientPool.poll();
// The pool was empty so create a new client and return that.
if (client == null) {
client = new HiveCatalogStoreClient(hiveConf);
}
client.markInUse();
return client;
}
/**
* Removes all items from the connection pool and closes all Hive Meta Store client
* connections. Can be called multiple times.
*/
public void close() {
// Ensure no more items get added to the pool once close is called.
if (poolClosed.getAndSet(true)) {
return;
}
HiveCatalogStoreClient client = null;
while ((client = clientPool.poll()) != null) {
client.getHiveClient().close();
}
}
}