blob: 67ac9c93efa3fb198c0d7e595fb8e573481b78ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.jdbc;
import static com.google.common.base.Preconditions.checkNotNull;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.concurrent.GuardedBy;
import com.google.common.cache.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.ConnectionQueryServicesImpl;
import org.apache.phoenix.query.ConnectionlessQueryServicesImpl;
import org.apache.phoenix.query.HBaseFactoryProvider;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesImpl;
import org.apache.phoenix.query.QueryServicesOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
*
* JDBC Driver implementation of Phoenix for production.
* To use this driver, specify the following URL:
* jdbc:phoenix:<zookeeper quorum server name>;
* Only an embedded driver is currently supported (Phoenix client
* runs in the same JVM as the driver). Connections are lightweight
* and are not pooled. The last part of the URL, the hbase zookeeper
* quorum server name, determines the hbase cluster to which queries
* will be routed.
*
*
* @since 0.1
*/
public final class PhoenixDriver extends PhoenixEmbeddedDriver {
private static final Logger logger = LoggerFactory.getLogger(PhoenixDriver.class);
public static final PhoenixDriver INSTANCE;
private static volatile String driverShutdownMsg;
static {
try {
INSTANCE = new PhoenixDriver();
try {
// Add shutdown hook to release any resources that were never closed
// In theory not necessary, but it won't hurt anything
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
final Configuration config =
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
final ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("PHOENIX-DRIVER-SHUTDOWNHOOK" + "-thread-%s")
.build();
final ExecutorService svc =
Executors.newSingleThreadExecutor(threadFactory);
try {
Future<?> future = svc.submit(new Runnable() {
@Override
public void run() {
closeInstance(INSTANCE);
}
});
// Pull the timeout value (default 5s).
long millisBeforeShutdown =
config.getLong(QueryServices.DRIVER_SHUTDOWN_TIMEOUT_MS,
QueryServicesOptions.DEFAULT_DRIVER_SHUTDOWN_TIMEOUT_MS);
// Close with a timeout. If this is running, we know the JVM wants to
// go down. There may be other threads running that are holding the
// lock. We don't want to be blocked on them (for the normal HBase retry
// policy). We don't care about any exceptions, we're going down anyways.
future.get(millisBeforeShutdown, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
logger.warn("Failed to close instance", e);
} catch (InterruptedException e) {
logger.warn("Interrupted waiting to close instance", e);
} catch (TimeoutException e) {
logger.warn("Timed out waiting to close instance", e);
} finally {
// We're going down, but try to clean up.
svc.shutdownNow();
}
}
});
// Only register the driver when we successfully register the shutdown hook
// Don't want to register it if we're already in the process of going down.
DriverManager.registerDriver(INSTANCE);
} catch (IllegalStateException e) {
logger.warn("Failed to register PhoenixDriver shutdown hook as the JVM is already shutting down");
// Close the instance now because we don't have the shutdown hook
closeInstance(INSTANCE);
throw e;
}
} catch (SQLException e) {
throw new IllegalStateException("Unable to register " + PhoenixDriver.class.getName() + ": "+ e.getMessage());
}
}
private static void closeInstance(PhoenixDriver instance) {
try {
instance.close();
} catch (SQLException e) {
logger.warn("Unable to close PhoenixDriver on shutdown", e);
} finally {
driverShutdownMsg = "Phoenix driver closed because server is shutting down";
}
}
// One entry per cluster here
private final Cache<ConnectionInfo, ConnectionQueryServices> connectionQueryServicesCache =
initializeConnectionCache();
public PhoenixDriver() { // for Squirrel
// Use production services implementation
super();
}
private Cache<ConnectionInfo, ConnectionQueryServices> initializeConnectionCache() {
Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
int maxCacheDuration = config.getInt(QueryServices.CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS,
QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION);
RemovalListener<ConnectionInfo, ConnectionQueryServices> cacheRemovalListener =
new RemovalListener<ConnectionInfo, ConnectionQueryServices>() {
@Override
public void onRemoval(RemovalNotification<ConnectionInfo, ConnectionQueryServices> notification) {
String connInfoIdentifier = notification.getKey().toString();
logger.debug("Expiring " + connInfoIdentifier + " because of "
+ notification.getCause().name());
try {
notification.getValue().close();
}
catch (SQLException se) {
logger.error("Error while closing expired cache connection " + connInfoIdentifier, se);
}
}
};
return CacheBuilder.newBuilder()
.expireAfterAccess(maxCacheDuration, TimeUnit.MILLISECONDS)
.removalListener(cacheRemovalListener)
.build();
}
// writes guarded by "this"
private volatile QueryServices services;
@GuardedBy("closeLock")
private volatile boolean closed = false;
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
@Override
public QueryServices getQueryServices() throws SQLException {
try {
lockInterruptibly(LockMode.READ);
checkClosed();
// Lazy initialize QueryServices so that we only attempt to create an HBase Configuration
// object upon the first attempt to connect to any cluster. Otherwise, an attempt will be
// made at driver initialization time which is too early for some systems.
QueryServices result = services;
if (result == null) {
synchronized(this) {
result = services;
if(result == null) {
services = result = new QueryServicesImpl(getDefaultProps());
}
}
}
return result;
} finally {
unlock(LockMode.READ);
}
}
@Override
public boolean acceptsURL(String url) throws SQLException {
// Accept the url only if test=true attribute not set
return super.acceptsURL(url) && !isTestUrl(url);
}
@Override
public Connection connect(String url, Properties info) throws SQLException {
if (!acceptsURL(url)) {
return null;
}
try {
lockInterruptibly(LockMode.READ);
checkClosed();
return createConnection(url, info);
} finally {
unlock(LockMode.READ);
}
}
@Override
protected ConnectionQueryServices getConnectionQueryServices(String url, final Properties info) throws SQLException {
try {
lockInterruptibly(LockMode.READ);
checkClosed();
ConnectionInfo connInfo = ConnectionInfo.create(url);
SQLException sqlE = null;
boolean success = false;
final QueryServices services = getQueryServices();
ConnectionQueryServices connectionQueryServices = null;
// Also performs the Kerberos login if the URL/properties request this
final ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps(), info);
try {
connectionQueryServices =
connectionQueryServicesCache.get(normalizedConnInfo, new Callable<ConnectionQueryServices>() {
@Override
public ConnectionQueryServices call() throws Exception {
ConnectionQueryServices connectionQueryServices;
if (normalizedConnInfo.isConnectionless()) {
connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo, info);
} else {
connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo, info);
}
return connectionQueryServices;
}
});
connectionQueryServices.init(url, info);
success = true;
} catch (ExecutionException ee){
if (ee.getCause() instanceof SQLException) {
sqlE = (SQLException) ee.getCause();
} else {
throw new SQLException(ee);
}
}
catch (SQLException e) {
sqlE = e;
}
finally {
if (!success) {
// Remove from map, as initialization failed
connectionQueryServicesCache.invalidate(normalizedConnInfo);
if (sqlE != null) {
throw sqlE;
}
}
}
return connectionQueryServices;
} finally {
unlock(LockMode.READ);
}
}
@GuardedBy("closeLock")
private void checkClosed() {
if (closed) {
throwDriverClosedException();
}
}
private void throwDriverClosedException() {
throw new IllegalStateException(driverShutdownMsg != null ? driverShutdownMsg : "The Phoenix jdbc driver has been closed.");
}
@Override
public synchronized void close() throws SQLException {
try {
lockInterruptibly(LockMode.WRITE);
if (closed) {
return;
}
closed = true;
} finally {
unlock(LockMode.WRITE);
}
if (services != null) {
try {
services.close();
} finally {
services = null;
}
}
}
private enum LockMode {
READ, WRITE
};
private void lockInterruptibly(LockMode mode) throws SQLException {
checkNotNull(mode);
switch (mode) {
case READ:
try {
closeLock.readLock().lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
.setRootCause(e).build().buildException();
}
break;
case WRITE:
try {
closeLock.writeLock().lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
.setRootCause(e).build().buildException();
}
}
}
private void unlock(LockMode mode) {
checkNotNull(mode);
switch (mode) {
case READ:
closeLock.readLock().unlock();
break;
case WRITE:
closeLock.writeLock().unlock();
}
}
}