blob: 80a8165d3eea0263e93d34413d8e484ad4b2039b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sentry.core.common.transport;
import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.utils.ThriftUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Pool of transport connections to Sentry servers.
* The pool caches open connections to multiple Sentry servers,
* specified in the configuration.
*
* When transport pooling is disabled in configuration,
* creates transports directly and doesn't cache connections.
*/
@ThreadSafe
public final class SentryTransportPool implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportPool.class);
// Used for logging to identify pool instances. This is only useful for test debugging
// so we do not preserve thread safety for this field.
private static int poolId = 0;
private final int id;
// True if using Object pool
private final boolean isPoolEnabled;
// Load balance between servers if true
private final boolean doLoadBalancing;
// List of all known servers
private final ArrayList<HostAndPort> endpoints;
// Transport pool which keeps connected transports
private final KeyedObjectPool<HostAndPort, TTransportWrapper> pool;
// Source of connected transports
private final TransportFactory transportFactory;
// Set when we are closed
private final AtomicBoolean closed = new AtomicBoolean();
/**
* Configure transport pool.
* <p>
* The pool accepts the following configuration:
* <ul>
* <li>Maximum total number of objects in the pool</li>
* <li>Minimum number of idle objects</li>
* <li>Maximum number of idle objects</li>
* <li>Minimum time before the object is evicted</li>
* <li>Interval between evictions</li>
* </ul>
* @param conf Configuration
* @param transportConfig Configuration interface
* @param transportFactory Transport factory used to produce transports
*/
public SentryTransportPool(Configuration conf,
SentryClientTransportConfigInterface transportConfig,
TransportFactory transportFactory) {
// This isn't thread-safe, but we don't care - it is only used
// for debugging when running tests - normal apps use a single pool
poolId++;
id = poolId;
this.transportFactory = transportFactory;
doLoadBalancing = transportConfig.isLoadBalancingEnabled(conf);
isPoolEnabled = transportConfig.isTransportPoolEnabled(conf);
// Get list of server addresses
String hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
int serverPort = transportConfig.getServerRpcPort(conf);
LOGGER.info("Creating pool for {} with default port {}",
hostsAndPortsStr, serverPort);
String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
Preconditions.checkArgument(hostsAndPortsStrArr.length > 0,
"At least one server should be specified");
endpoints = new ArrayList<>(hostsAndPortsStrArr.length);
for(String addr: hostsAndPortsStrArr) {
HostAndPort endpoint = ThriftUtil.parseAddress(addr, serverPort);
LOGGER.info("Adding endpoint {}", endpoint);
endpoints.add(endpoint);
}
if (!isPoolEnabled) {
pool = null;
LOGGER.info("Connection pooling is disabled");
return;
}
LOGGER.info("Connection pooling is enabled");
// Set pool configuration based on Configuration settings
GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();
// Don't limit maximum number of objects in the pool
poolConfig.setMaxTotal(-1);
poolConfig.setMinIdlePerKey(transportConfig.getPoolMinIdle(conf));
poolConfig.setMaxIdlePerKey(transportConfig.getPoolMaxIdle(conf));
// Do not block when pool is exhausted, throw exception instead
poolConfig.setBlockWhenExhausted(false);
poolConfig.setTestOnReturn(true);
// No limit for total objects in the pool
poolConfig.setMaxTotalPerKey(transportConfig.getPoolMaxTotal(conf));
poolConfig.setMinEvictableIdleTimeMillis(transportConfig.getMinEvictableTimeSec(conf));
poolConfig.setTimeBetweenEvictionRunsMillis(transportConfig.getTimeBetweenEvictionRunsSec(conf));
// Create object pool
pool = new GenericKeyedObjectPool<>(new PoolFactory(this.transportFactory, id),
poolConfig);
}
/**
* Get an open transport instance.
* The instance can be connected to any of the available servers.
* We are trying to randomly load-balance between servers (unless it is
* disabled in configuration).
*
* @return connected transport
* @throws Exception if connection tto both servers fails
*/
public TTransportWrapper getTransport() throws Exception {
List<HostAndPort> servers;
// If we are doing load balancing and there is more then one server,
// shuffle them before obtaining connection
if (doLoadBalancing && (endpoints.size() > 1)) {
servers = new ArrayList<>(endpoints);
Collections.shuffle(servers);
} else {
servers = endpoints;
}
// Try to get a connection from one of the pools.
Exception failure = null;
boolean ignoreEmptyPool = true;
for (int attempt = 0; attempt < 2; attempt++) {
// First only attempt to borrow from pools which have some idle connections
// If this fails, try with all pools
for (HostAndPort addr : servers) {
if (isPoolEnabled && ignoreEmptyPool && (pool.getNumIdle(addr) == 0)) {
LOGGER.debug("Ignoring empty pool {}", addr);
ignoreEmptyPool = false;
continue;
}
try {
TTransportWrapper transport =
isPoolEnabled ?
pool.borrowObject(addr) :
transportFactory.getTransport(addr);
LOGGER.debug("[{}] obtained transport {}", id, transport);
if (LOGGER.isDebugEnabled() && isPoolEnabled) {
LOGGER.debug("Currently {} active connections, {} idle connections",
pool.getNumActive(), pool.getNumIdle());
}
return transport;
} catch (IllegalStateException e) {
// Should not happen
LOGGER.error("Unexpected error from pool {}", id, e);
failure = e;
} catch (Exception e) {
LOGGER.error("Failed to obtain transport for {}: {}",
addr, e.getMessage());
failure = e;
}
}
ignoreEmptyPool = false;
}
// Failed to borrow connect to any endpoint
assert failure != null;
throw failure;
}
/**
* Return transport to the pool
* @param transport Open transport
*/
public void returnTransport(TTransportWrapper transport) {
if (closed.get()) {
LOGGER.debug("Returned {} to closed pool", transport);
transport.close();
return;
}
try {
if (isPoolEnabled) {
LOGGER.debug("[{}] returning {}", id, transport);
pool.returnObject(transport.getAddress(), transport);
} else {
LOGGER.debug("Closing {}", transport);
transport.close();
}
} catch (Exception e) {
LOGGER.error("Failed to return {}", transport, e);
}
}
public void invalidateTransport(TTransportWrapper transport) {
if (closed.get()) {
LOGGER.debug("invalidated {} for closed pool", transport);
transport.close();
return;
}
try {
LOGGER.debug("[{}] Invalidating address {}", id, transport);
if (!isPoolEnabled) {
transport.close();
} else {
pool.invalidateObject(transport.getAddress(), transport);
// Invalidate the whole pool associated with the given address
// It is a bit brutal since a single bad connection may
// cause an invalidation, but otherwise we may have a lot of bad
// connections in the pool and try to return them.
pool.clear(transport.getAddress());
}
} catch (Exception e) {
LOGGER.error("Failed to invalidate {}", transport, e);
}
}
@Override
public void close() throws Exception {
if (closed.get()) {
// already closed
return;
}
LOGGER.debug("[{}] closing", id);
if (pool != null) {
LOGGER.debug("Closing pool of {}/{} endpoints",
pool.getNumIdle(), pool.getNumActive());
pool.close();
}
}
/**
* Factory that creates and destroys pool objects
*/
private static final class PoolFactory
extends BaseKeyedPooledObjectFactory<HostAndPort, TTransportWrapper> {
private final TransportFactory transportFactory;
private final int id;
/**
* Create a pool factory associated with the given transport factory
* @param transportFactory - factory producing transports
* @param id pool id (for debugging)
*/
private PoolFactory(TransportFactory transportFactory, int id) {
this.transportFactory = transportFactory;
this.id = id;
}
@Override
public boolean validateObject(HostAndPort key, PooledObject<TTransportWrapper> p) {
TTransportWrapper transport = p.getObject();
if (transport == null) {
LOGGER.error("No transport to validate");
return false;
}
if (transport.getAddress() != key) {
LOGGER.error("Invalid endpoint {}: does not match {}", transport, key);
return false;
}
return true;
}
@Override
public TTransportWrapper create(HostAndPort key) throws Exception {
TTransportWrapper transportWrapper = transportFactory.getTransport(key);
LOGGER.debug("[{}] created {}", id, transportWrapper);
return transportWrapper;
}
@Override
public void destroyObject(HostAndPort key, PooledObject<TTransportWrapper> p) throws Exception {
TTransportWrapper transport = p.getObject();
if (transport != null) {
LOGGER.debug("[{}] Destroying endpoint {}", id, transport);
try {
transport.close();
} catch (RuntimeException e) {
LOGGER.error("fail to destroy endpoint {}", transport, e);
}
}
}
@Override
public PooledObject<TTransportWrapper> wrap(TTransportWrapper value) {
return new DefaultPooledObject<>(value);
}
}
}