blob: acf9b05281d8456ebdf77414dc7886f9aa4ef28f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sentry.service.thrift;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import com.google.common.net.HostAndPort;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.AbandonedConfig;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.transport.SentryClientInvocationHandler;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
import org.apache.sentry.core.common.utils.ThriftUtil;
import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The PoolClientInvocationHandler is a proxy class for handling thrift
* call. For every thrift call, get the instance of
* SentryPolicyServiceBaseClient from the commons-pool, and return the instance
* to the commons-pool after complete the call. For any exception with the call,
* discard the instance and create a new one added to the commons-pool. Then,
* get the instance and do the call again. For the thread safe, the commons-pool
* will manage the connection pool, and every thread can get the connection by
* borrowObject() and return the connection to the pool by returnObject().
*
* TODO: Current pool model does not manage the opening connections very well,
* e.g. opening connections with failed servers should be closed promptly.
*/
public class PoolClientInvocationHandler extends SentryClientInvocationHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(PoolClientInvocationHandler.class);
private static final String POOL_EXCEPTION_MESSAGE = "Pool exception occurred ";
private final Configuration conf;
/**
* The configuration to use for our object pools.
* Null if we are not using object pools.
*/
private final GenericObjectPoolConfig poolConfig;
/**
* The total number of connection retries to attempt per endpoint.
*/
private final int connectionRetryTotal;
/**
* The configured sentry servers.
*/
private final Endpoint[] endpoints;
/**
* The endpoint which we are currently using. This can be read without any locks.
* It must be written while holding the endpoints lock.
*/
private volatile int freshestEndpointIdx = 0;
private class Endpoint {
/**
* The server address or hostname.
*/
private final String addr;
/**
* The server port.
*/
private final int port;
/**
* The server's poolFactory used to create new clients.
*/
private final PooledObjectFactory<SentryPolicyServiceClient> poolFactory;
/**
* The server's pool of cached clients.
*/
private final GenericObjectPool<SentryPolicyServiceClient> pool;
Endpoint(String addr, int port) {
this.addr = addr;
this.port = port;
this.poolFactory = new SentryServiceClientPoolFactory(addr, port, conf);
this.pool = new GenericObjectPool<SentryPolicyServiceClient>(
this.poolFactory, poolConfig, new AbandonedConfig());
}
GenericObjectPool<SentryPolicyServiceClient> getPool() {
return pool;
}
String getEndPointStr() {
return new String("endpoint at [address " + addr + ", port " + port + "]");
}
}
public PoolClientInvocationHandler(Configuration conf) throws Exception {
this.conf = conf;
this.poolConfig = new GenericObjectPoolConfig();
// config the pool size for commons-pool
this.poolConfig.setMaxTotal(conf.getInt(ClientConfig.SENTRY_POOL_MAX_TOTAL,
ClientConfig.SENTRY_POOL_MAX_TOTAL_DEFAULT));
this.poolConfig.setMinIdle(conf.getInt(ClientConfig.SENTRY_POOL_MIN_IDLE,
ClientConfig.SENTRY_POOL_MIN_IDLE_DEFAULT));
this.poolConfig.setMaxIdle(conf.getInt(ClientConfig.SENTRY_POOL_MAX_IDLE,
ClientConfig.SENTRY_POOL_MAX_IDLE_DEFAULT));
// get the retry number for reconnecting service
this.connectionRetryTotal = conf.getInt(ClientConfig.SENTRY_POOL_RETRY_TOTAL,
ClientConfig.SENTRY_POOL_RETRY_TOTAL_DEFAULT);
String hostsAndPortsStr = conf.get(ClientConfig.SERVER_RPC_ADDRESS);
if (hostsAndPortsStr == null) {
throw new RuntimeException("Config key " +
ClientConfig.SERVER_RPC_ADDRESS + " is required");
}
int defaultPort = conf.getInt(ClientConfig.SERVER_RPC_PORT,
ClientConfig.SERVER_RPC_PORT_DEFAULT);
String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, defaultPort);
this.endpoints = new Endpoint[hostsAndPorts.length];
for (int i = 0; i < this.endpoints.length; i++) {
this.endpoints[i] = new Endpoint(hostsAndPorts[i].getHostText(),hostsAndPorts[i].getPort());
LOGGER.info("Initiate sentry sever endpoint: hostname " +
hostsAndPorts[i].getHostText() + ", port " + hostsAndPorts[i].getPort());
}
}
@Override
public Object invokeImpl(Object proxy, Method method, Object[] args)
throws Exception {
int retryCount = 0;
/**
* The maximum number of retries that we will do. Each endpoint gets its
* own set of retries.
*/
int retryLimit = connectionRetryTotal * endpoints.length;
/**
* The index of the endpoint to use.
*/
int endpointIdx = freshestEndpointIdx;
/**
* A list of exceptions from each endpoint. This starts as null to avoid
* memory allocation in the common case where there is no error.
*/
Exception exc[] = null;
Object ret = null;
while (retryCount < retryLimit) {
GenericObjectPool<SentryPolicyServiceClient> pool =
endpoints[endpointIdx].getPool();
try {
if ((exc != null) &&
(exc[endpointIdx] instanceof TTransportException)) {
// If there was a TTransportException last time we tried to contact
// this endpoint, attempt to create a new connection before we try
// again.
synchronized (endpoints) {
// If there has room, create new instance and add it to the
// commons-pool. This instance will be returned first from the
// commons-pool, because the configuration is LIFO.
if (pool.getNumIdle() + pool.getNumActive() < pool.getMaxTotal()) {
pool.addObject();
}
}
}
// Try to make the RPC.
ret = invokeFromPool(method, args, pool);
break;
} catch (TTransportException e) {
if (exc == null) {
exc = new Exception[endpoints.length];
}
exc[endpointIdx] = e;
}
Exception lastExc = exc[endpointIdx];
synchronized (endpoints) {
int curFreshestEndpointIdx = freshestEndpointIdx;
if (curFreshestEndpointIdx == endpointIdx) {
curFreshestEndpointIdx =
(curFreshestEndpointIdx + 1) % endpoints.length;
freshestEndpointIdx = curFreshestEndpointIdx;
}
endpointIdx = curFreshestEndpointIdx;
}
// Increase the retry num, and throw the exception if can't retry again.
retryCount++;
if (retryCount == connectionRetryTotal) {
for (int i = 0; i < exc.length; i++) {
// Since freshestEndpointIdx is shared by multiple threads, it is possible that
// the ith endpoint has been tried in another thread and skipped in the current
// thread.
if (exc[i] != null) {
LOGGER.error("Sentry server " + endpoints[i].getEndPointStr()
+ " is in unreachable.");
}
}
throw new SentryUserException("Sentry servers are unreachable. " +
"Diagnostics is needed for unreachable servers.", lastExc);
}
}
return ret;
}
private Object invokeFromPool(Method method, Object[] args,
GenericObjectPool<SentryPolicyServiceClient> pool) throws Exception {
Object result = null;
SentryPolicyServiceClient client;
try {
// get the connection from the pool, don't know if the connection is broken.
client = pool.borrowObject();
} catch (Exception e) {
LOGGER.debug(POOL_EXCEPTION_MESSAGE, e);
// If the exception is caused by connection problem, throw the TTransportException
// for reconnect.
if (e instanceof IOException) {
throw new TTransportException(e);
}
throw new SentryUserException(e.getMessage(), e);
}
try {
// do the thrift call
result = method.invoke(client, args);
} catch (InvocationTargetException e) {
// Get the target exception, check if SentryUserException or TTransportException is wrapped.
// TTransportException or IOException means there has connection problem with the pool.
Throwable targetException = e.getCause();
if (targetException instanceof SentryUserException) {
Throwable sentryTargetException = targetException.getCause();
// If there has connection problem, eg, invalid connection if the service restarted,
// sentryTargetException instanceof TTransportException or IOException = true.
if (sentryTargetException instanceof TTransportException
|| sentryTargetException instanceof IOException) {
// If the exception is caused by connection problem, destroy the instance and
// remove it from the commons-pool. Throw the TTransportException for reconnect.
pool.invalidateObject(client);
throw new TTransportException(sentryTargetException);
}
// The exception is thrown by thrift call, eg, SentryAccessDeniedException.
throw (SentryUserException) targetException;
}
throw e;
} finally{
try {
// return the instance to commons-pool
pool.returnObject(client);
} catch (Exception e) {
LOGGER.error(POOL_EXCEPTION_MESSAGE, e);
throw e;
}
}
return result;
}
@Override
public void close() {
for (int i = 0; i < endpoints.length; i++) {
try {
endpoints[i].getPool().close();
} catch (Exception e) {
LOGGER.debug(POOL_EXCEPTION_MESSAGE, e);
}
}
}
}