blob: 169ac02e14d8ecb246eef25f8fd8ebc4da0ce225 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.metastore;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.datanucleus.exceptions.NucleusException;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RetryingHMSHandler implements InvocationHandler {
private static final Logger LOG = LoggerFactory.getLogger(RetryingHMSHandler.class);
private static final String CLASS_NAME = RetryingHMSHandler.class.getName();
private static class Result {
private final Object result;
private final int numRetries;
public Result(Object result, int numRetries) {
this.result = result;
this.numRetries = numRetries;
}
}
private final IHMSHandler baseHandler;
private final MetaStoreInit.MetaStoreInitData metaStoreInitData =
new MetaStoreInit.MetaStoreInitData();
private final Configuration origConf; // base configuration
private final Configuration activeConf; // active configuration
private RetryingHMSHandler(Configuration origConf, IHMSHandler baseHandler, boolean local) throws MetaException {
this.origConf = origConf;
this.baseHandler = baseHandler;
if (local) {
baseHandler.setConf(origConf); // tests expect configuration changes applied directly to metastore
}
activeConf = baseHandler.getConf();
// This has to be called before initializing the instance of HMSHandler
// Using the hook on startup ensures that the hook always has priority
// over settings in *.xml. The thread local conf needs to be used because at this point
// it has already been initialized using hiveConf.
MetaStoreInit.updateConnectionURL(origConf, getActiveConf(), null, metaStoreInitData);
try {
//invoking init method of baseHandler this way since it adds the retry logic
//in case of transient failures in init method
invoke(baseHandler, baseHandler.getClass().getDeclaredMethod("init", (Class<?>[]) null),
null);
} catch (Throwable e) {
LOG.error("HMSHandler Fatal error: " + ExceptionUtils.getStackTrace(e));
MetaException me = new MetaException(e.getMessage());
me.initCause(e);
throw me;
}
}
public static IHMSHandler getProxy(Configuration conf, IHMSHandler baseHandler, boolean local)
throws MetaException {
RetryingHMSHandler handler = new RetryingHMSHandler(conf, baseHandler, local);
return (IHMSHandler) Proxy.newProxyInstance(
RetryingHMSHandler.class.getClassLoader(),
new Class[] { IHMSHandler.class }, handler);
}
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
int retryCount = -1;
int threadId = baseHandler.getThreadId();
boolean error = true;
PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
perfLogger.perfLogBegin(CLASS_NAME, method.getName());
try {
Result result = invokeInternal(proxy, method, args);
retryCount = result.numRetries;
error = false;
return result.result;
} finally {
StringBuilder additionalInfo = new StringBuilder();
additionalInfo.append("threadId=").append(threadId).append(" retryCount=").append(retryCount)
.append(" error=").append(error);
perfLogger.perfLogEnd(CLASS_NAME, method.getName(), additionalInfo.toString());
}
}
public Result invokeInternal(final Object proxy, final Method method, final Object[] args) throws Throwable {
boolean gotNewConnectUrl = false;
boolean reloadConf = MetastoreConf.getBoolVar(origConf, ConfVars.HMS_HANDLER_FORCE_RELOAD_CONF);
long retryInterval = MetastoreConf.getTimeVar(origConf,
ConfVars.HMS_HANDLER_INTERVAL, TimeUnit.MILLISECONDS);
int retryLimit = MetastoreConf.getIntVar(origConf, ConfVars.HMS_HANDLER_ATTEMPTS);
long timeout = MetastoreConf.getTimeVar(origConf,
ConfVars.CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
Deadline.registerIfNot(timeout);
if (reloadConf) {
MetaStoreInit.updateConnectionURL(origConf, getActiveConf(),
null, metaStoreInitData);
}
int retryCount = 0;
Throwable caughtException = null;
while (true) {
try {
if (reloadConf || gotNewConnectUrl) {
baseHandler.setConf(getActiveConf());
}
Object object = null;
boolean isStarted = Deadline.startTimer(method.getName());
try {
object = method.invoke(baseHandler, args);
} finally {
if (isStarted) {
Deadline.stopTimer();
}
}
return new Result(object, retryCount);
} catch (UndeclaredThrowableException e) {
if (e.getCause() != null) {
if (e.getCause() instanceof javax.jdo.JDOException) {
// Due to reflection, the jdo exception is wrapped in
// invocationTargetException
caughtException = e.getCause();
} else if (e.getCause() instanceof MetaException && e.getCause().getCause() != null
&& e.getCause().getCause() instanceof javax.jdo.JDOException) {
// The JDOException may be wrapped further in a MetaException
caughtException = e.getCause().getCause();
} else {
LOG.error(ExceptionUtils.getStackTrace(e.getCause()));
throw e.getCause();
}
} else {
LOG.error(ExceptionUtils.getStackTrace(e));
throw e;
}
} catch (InvocationTargetException e) {
if (e.getCause() instanceof javax.jdo.JDOException) {
// Due to reflection, the jdo exception is wrapped in
// invocationTargetException
caughtException = e.getCause();
} else if (e.getCause() instanceof NoSuchObjectException || e.getTargetException().getCause() instanceof NoSuchObjectException) {
String methodName = method.getName();
if (!methodName.startsWith("get_database") && !methodName.startsWith("get_table")
&& !methodName.startsWith("get_partition") && !methodName.startsWith("get_function")
&& !methodName.startsWith("get_stored_procedure") && !methodName.startsWith("find_package")) {
LOG.error(ExceptionUtils.getStackTrace(e.getCause()));
}
throw e.getCause();
} else if (e.getCause() instanceof MetaException && e.getCause().getCause() != null) {
if (e.getCause().getCause() instanceof javax.jdo.JDOException ||
e.getCause().getCause() instanceof NucleusException) {
// The JDOException or the Nucleus Exception may be wrapped further in a MetaException
caughtException = e.getCause().getCause();
} else if (e.getCause().getCause() instanceof DeadlineException) {
// The Deadline Exception needs no retry and be thrown immediately.
Deadline.clear();
LOG.error("Error happens in method " + method.getName() + ": " +
ExceptionUtils.getStackTrace(e.getCause()));
throw e.getCause();
} else {
LOG.error(ExceptionUtils.getStackTrace(e.getCause()));
throw e.getCause();
}
} else {
LOG.error(ExceptionUtils.getStackTrace(e.getCause()));
throw e.getCause();
}
}
if (retryCount >= retryLimit) {
LOG.error("HMSHandler Fatal error: " + ExceptionUtils.getStackTrace(caughtException));
MetaException me = new MetaException(caughtException.toString());
me.initCause(caughtException);
throw me;
}
assert (retryInterval >= 0);
retryCount++;
LOG.error(
String.format(
"Retrying HMSHandler after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit) +
" with error: " + ExceptionUtils.getStackTrace(caughtException));
Thread.sleep(retryInterval);
// If we have a connection error, the JDO connection URL hook might
// provide us with a new URL to access the datastore.
String lastUrl = MetaStoreInit.getConnectionURL(getActiveConf());
gotNewConnectUrl = MetaStoreInit.updateConnectionURL(origConf, getActiveConf(),
lastUrl, metaStoreInitData);
}
}
public Configuration getActiveConf() {
return activeConf;
}
}