blob: acf9b05281d8456ebdf77414dc7886f9aa4ef28f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.sentry.service.thrift;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.AbandonedConfig;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.transport.SentryClientInvocationHandler;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
import org.apache.sentry.core.common.utils.ThriftUtil;
import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* The PoolClientInvocationHandler is a proxy class for handling thrift
* call. For every thrift call, get the instance of
* SentryPolicyServiceBaseClient from the commons-pool, and return the instance
* to the commons-pool after complete the call. For any exception with the call,
* discard the instance and create a new one added to the commons-pool. Then,
* get the instance and do the call again. For the thread safe, the commons-pool
* will manage the connection pool, and every thread can get the connection by
* borrowObject() and return the connection to the pool by returnObject().
* TODO: Current pool model does not manage the opening connections very well,
* e.g. opening connections with failed servers should be closed promptly.
public class PoolClientInvocationHandler extends SentryClientInvocationHandler {
private static final Logger LOGGER =
private static final String POOL_EXCEPTION_MESSAGE = "Pool exception occurred ";
private final Configuration conf;
* The configuration to use for our object pools.
* Null if we are not using object pools.
private final GenericObjectPoolConfig poolConfig;
* The total number of connection retries to attempt per endpoint.
private final int connectionRetryTotal;
* The configured sentry servers.
private final Endpoint[] endpoints;
* The endpoint which we are currently using. This can be read without any locks.
* It must be written while holding the endpoints lock.
private volatile int freshestEndpointIdx = 0;
private class Endpoint {
* The server address or hostname.
private final String addr;
* The server port.
private final int port;
* The server's poolFactory used to create new clients.
private final PooledObjectFactory<SentryPolicyServiceClient> poolFactory;
* The server's pool of cached clients.
private final GenericObjectPool<SentryPolicyServiceClient> pool;
Endpoint(String addr, int port) {
this.addr = addr;
this.port = port;
this.poolFactory = new SentryServiceClientPoolFactory(addr, port, conf);
this.pool = new GenericObjectPool<SentryPolicyServiceClient>(
this.poolFactory, poolConfig, new AbandonedConfig());
GenericObjectPool<SentryPolicyServiceClient> getPool() {
return pool;
String getEndPointStr() {
return new String("endpoint at [address " + addr + ", port " + port + "]");
public PoolClientInvocationHandler(Configuration conf) throws Exception {
this.conf = conf;
this.poolConfig = new GenericObjectPoolConfig();
// config the pool size for commons-pool
// get the retry number for reconnecting service
this.connectionRetryTotal = conf.getInt(ClientConfig.SENTRY_POOL_RETRY_TOTAL,
String hostsAndPortsStr = conf.get(ClientConfig.SERVER_RPC_ADDRESS);
if (hostsAndPortsStr == null) {
throw new RuntimeException("Config key " +
ClientConfig.SERVER_RPC_ADDRESS + " is required");
int defaultPort = conf.getInt(ClientConfig.SERVER_RPC_PORT,
String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, defaultPort);
this.endpoints = new Endpoint[hostsAndPorts.length];
for (int i = 0; i < this.endpoints.length; i++) {
this.endpoints[i] = new Endpoint(hostsAndPorts[i].getHostText(),hostsAndPorts[i].getPort());"Initiate sentry sever endpoint: hostname " +
hostsAndPorts[i].getHostText() + ", port " + hostsAndPorts[i].getPort());
public Object invokeImpl(Object proxy, Method method, Object[] args)
throws Exception {
int retryCount = 0;
* The maximum number of retries that we will do. Each endpoint gets its
* own set of retries.
int retryLimit = connectionRetryTotal * endpoints.length;
* The index of the endpoint to use.
int endpointIdx = freshestEndpointIdx;
* A list of exceptions from each endpoint. This starts as null to avoid
* memory allocation in the common case where there is no error.
Exception exc[] = null;
Object ret = null;
while (retryCount < retryLimit) {
GenericObjectPool<SentryPolicyServiceClient> pool =
try {
if ((exc != null) &&
(exc[endpointIdx] instanceof TTransportException)) {
// If there was a TTransportException last time we tried to contact
// this endpoint, attempt to create a new connection before we try
// again.
synchronized (endpoints) {
// If there has room, create new instance and add it to the
// commons-pool. This instance will be returned first from the
// commons-pool, because the configuration is LIFO.
if (pool.getNumIdle() + pool.getNumActive() < pool.getMaxTotal()) {
// Try to make the RPC.
ret = invokeFromPool(method, args, pool);
} catch (TTransportException e) {
if (exc == null) {
exc = new Exception[endpoints.length];
exc[endpointIdx] = e;
Exception lastExc = exc[endpointIdx];
synchronized (endpoints) {
int curFreshestEndpointIdx = freshestEndpointIdx;
if (curFreshestEndpointIdx == endpointIdx) {
curFreshestEndpointIdx =
(curFreshestEndpointIdx + 1) % endpoints.length;
freshestEndpointIdx = curFreshestEndpointIdx;
endpointIdx = curFreshestEndpointIdx;
// Increase the retry num, and throw the exception if can't retry again.
if (retryCount == connectionRetryTotal) {
for (int i = 0; i < exc.length; i++) {
// Since freshestEndpointIdx is shared by multiple threads, it is possible that
// the ith endpoint has been tried in another thread and skipped in the current
// thread.
if (exc[i] != null) {
LOGGER.error("Sentry server " + endpoints[i].getEndPointStr()
+ " is in unreachable.");
throw new SentryUserException("Sentry servers are unreachable. " +
"Diagnostics is needed for unreachable servers.", lastExc);
return ret;
private Object invokeFromPool(Method method, Object[] args,
GenericObjectPool<SentryPolicyServiceClient> pool) throws Exception {
Object result = null;
SentryPolicyServiceClient client;
try {
// get the connection from the pool, don't know if the connection is broken.
client = pool.borrowObject();
} catch (Exception e) {
// If the exception is caused by connection problem, throw the TTransportException
// for reconnect.
if (e instanceof IOException) {
throw new TTransportException(e);
throw new SentryUserException(e.getMessage(), e);
try {
// do the thrift call
result = method.invoke(client, args);
} catch (InvocationTargetException e) {
// Get the target exception, check if SentryUserException or TTransportException is wrapped.
// TTransportException or IOException means there has connection problem with the pool.
Throwable targetException = e.getCause();
if (targetException instanceof SentryUserException) {
Throwable sentryTargetException = targetException.getCause();
// If there has connection problem, eg, invalid connection if the service restarted,
// sentryTargetException instanceof TTransportException or IOException = true.
if (sentryTargetException instanceof TTransportException
|| sentryTargetException instanceof IOException) {
// If the exception is caused by connection problem, destroy the instance and
// remove it from the commons-pool. Throw the TTransportException for reconnect.
throw new TTransportException(sentryTargetException);
// The exception is thrown by thrift call, eg, SentryAccessDeniedException.
throw (SentryUserException) targetException;
throw e;
} finally{
try {
// return the instance to commons-pool
} catch (Exception e) {
throw e;
return result;
public void close() {
for (int i = 0; i < endpoints.length; i++) {
try {
} catch (Exception e) {