blob: 112f0509dfa661f6484e94860efc921f53d1485a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sentry.core.common.transport;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
* The RetryClientInvocationHandler is a proxy class for handling thrift calls for non-pool
* model. Currently only one client connection is allowed.
* <p>
* For every rpc call, if the client is not connected, it will first connect to one of the
* sentry servers, and then do the thrift call to the connected sentry server, which will
* execute the requested method and return back the response. If it is failed with connection
* problem, it will close the current connection and retry (reconnect and resend the
* thrift call) no more than rpcRetryTotal times. If the client is already connected, it
* will reuse the existing connection, and do the thrift call.
* <p>
* During reconnection, invocatiaon handler will first cycle through all the configured sentry servers, and
* then retry the whole server list no more than connectionFullRetryTotal times. In this
* case, it won't introduce more latency when some server fails.
* <p>
*/
public final class RetryClientInvocationHandler extends SentryClientInvocationHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(RetryClientInvocationHandler.class);
private SentryConnection client = null;
private final int maxRetryCount;
/**
* Initialize the sentry configurations, including rpc retry count and client connection
* configs for SentryPolicyServiceClientDefaultImpl
*/
public RetryClientInvocationHandler(Configuration conf, SentryConnection clientObject,
SentryClientTransportConfigInterface transportConfig) {
Preconditions.checkNotNull(conf, "Configuration object cannot be null");
Preconditions.checkNotNull(clientObject, "Client Object cannot be null");
client = clientObject;
maxRetryCount = transportConfig.getSentryRpcRetryTotal(conf);
}
/**
* For every rpc call, if the client is not connected, it will first connect to a sentry
* server, and then do the thrift call to the connected sentry server, which will
* execute the requested method and return back the response. If it is failed with
* connection problem, it will close the current connection, and retry (reconnect and
* resend the thrift call) no more than rpcRetryTotal times. Throw SentryUserException
* if failed retry after rpcRetryTotal times.
* if it is failed with other exception, method would just re-throw the exception.
*/
@Override
public synchronized Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception {
Exception lastExc = null;
for (int retryCount = 0; retryCount < maxRetryCount; retryCount++) {
connect();
// do the thrift call
try {
LOGGER.debug("Calling client {}", method.getName());
return method.invoke(client, args);
} catch (InvocationTargetException e) {
// Get the target exception, check if SentryUserException or TTransportException is wrapped.
// TTransportException means there is a connection problem.
LOGGER.error("failed to execute {}", method.getName(), e);
Throwable targetException = e.getCause();
if (!((targetException instanceof SentryUserException) ||
(targetException instanceof SentryHdfsServiceException))) {
throw e;
}
Throwable sentryTargetException = targetException.getCause();
// If there has connection problem, eg, invalid connection if the service restarted,
// sentryTargetException instanceof TTransportException will be true.
if (sentryTargetException instanceof TTransportException) {
// Retry when the exception is caused by connection problem.
lastExc = new TTransportException(sentryTargetException);
LOGGER.error("Thrift call failed", lastExc);
// The connection to the server is bad, inform the client of the problem
client.invalidate();
} else {
// Semantic exception which does not indicate the connection failure.
// Do not need to reconnect to the sentry server.
if (targetException instanceof SentryUserException) {
throw (SentryUserException) targetException;
} else {
throw (SentryHdfsServiceException) targetException;
}
}
}
}
// Throw the exception as reaching the max rpc retry num.
String error = String.format("Request failed, %d retries attempted ", maxRetryCount);
throw new SentryUserException(error, lastExc);
}
/**
* Connect the client, retry multiple times
* @throws Exception
*/
private void connect() throws Exception {
Exception lastExc = null;
for (int retryCount = 0; retryCount < maxRetryCount; retryCount++) {
try {
client.connect();
return;
} catch (Exception e) {
// Increase the retry num
// Retry when the exception is caused by connection problem.
LOGGER.error("failed to connect", e);
retryCount++;
lastExc = e;
}
}
assert lastExc != null;
throw lastExc;
}
@Override
public synchronized void close() {
//We are done with this client
client.done();
}
}