blob: e454d490d241b8ca595e0831a4b0fb9cd5bc3795 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.List;
import javax.net.SocketFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.client.utils.URLEncodedUtils;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import static java.lang.Thread.sleep;
/**
* Heartbeat RPC proxy invocation handler that handles fail over.
*
* @since 0.9.3
*/
public class RecoverableRpcProxy implements java.lang.reflect.InvocationHandler, Closeable
{
private static final Logger LOG = LoggerFactory.getLogger(RecoverableRpcProxy.class);
public static final String RPC_TIMEOUT = "com.datatorrent.stram.rpc.timeout";
public static final String RETRY_TIMEOUT = "com.datatorrent.stram.rpc.retry.timeout";
public static final String RETRY_DELAY = "com.datatorrent.stram.rpc.delay.timeout";
public static final String QP_retryTimeoutMillis = "retryTimeoutMillis";
public static final String QP_retryDelayMillis = "retryDelayMillis";
public static final String QP_rpcTimeout = "rpcTimeout";
private static final int RETRY_TIMEOUT_DEFAULT = 30000;
private static final int RETRY_DELAY_DEFAULT = 10000;
private static final int RPC_TIMEOUT_DEFAULT = 5000;
private final Configuration conf;
private StreamingContainerUmbilicalProtocol umbilical;
private String lastConnectURI;
private long retryTimeoutMillis;
private long retryDelayMillis;
private int rpcTimeout;
private final UserGroupInformation currentUser;
private final SocketFactory defaultSocketFactory;
private final FSRecoveryHandler fsRecoveryHandler;
public RecoverableRpcProxy(String appPath, Configuration conf)
{
this.conf = conf;
try {
currentUser = UserGroupInformation.getCurrentUser();
defaultSocketFactory = NetUtils.getDefaultSocketFactory(conf);
fsRecoveryHandler = new FSRecoveryHandler(appPath, conf);
connect(0);
} catch (IOException e) {
LOG.error("Fail to create RecoverableRpcProxy", e);
throw new RuntimeException(e);
}
}
private long connect(long timeMillis) throws IOException
{
String uriStr = fsRecoveryHandler.readConnectUri();
if (!uriStr.equals(lastConnectURI)) {
LOG.debug("Got new RPC connect address {}", uriStr);
lastConnectURI = uriStr;
if (umbilical != null) {
RPC.stopProxy(umbilical);
}
retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, RETRY_TIMEOUT_DEFAULT);
retryDelayMillis = Long.getLong(RETRY_DELAY, RETRY_DELAY_DEFAULT);
rpcTimeout = Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT);
URI heartbeatUri = URI.create(uriStr);
String queryStr = heartbeatUri.getQuery();
if (queryStr != null) {
List<NameValuePair> queryList = URLEncodedUtils.parse(queryStr, Charset.defaultCharset());
if (queryList != null) {
for (NameValuePair pair : queryList) {
String value = pair.getValue();
String key = pair.getName();
if (QP_rpcTimeout.equals(key)) {
this.rpcTimeout = Integer.parseInt(value);
} else if (QP_retryTimeoutMillis.equals(key)) {
this.retryTimeoutMillis = Long.parseLong(value);
} else if (QP_retryDelayMillis.equals(key)) {
this.retryDelayMillis = Long.parseLong(value);
}
}
}
}
InetSocketAddress address = NetUtils.createSocketAddrForHost(heartbeatUri.getHost(), heartbeatUri.getPort());
umbilical = RPC.getProxy(StreamingContainerUmbilicalProtocol.class, StreamingContainerUmbilicalProtocol.versionID, address, currentUser, conf, defaultSocketFactory, rpcTimeout);
// reset timeout
return System.currentTimeMillis() + retryTimeoutMillis;
}
return timeMillis;
}
public StreamingContainerUmbilicalProtocol getProxy() throws IOException
{
if (umbilical == null) {
throw new IOException("RecoverableRpcProxy is closed.");
}
StreamingContainerUmbilicalProtocol recoverableProxy = (StreamingContainerUmbilicalProtocol)Proxy.newProxyInstance(umbilical.getClass().getClassLoader(), umbilical.getClass().getInterfaces(), this);
return recoverableProxy;
}
@Override
@SuppressWarnings("SleepWhileInLoop")
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
long endTimeMillis = System.currentTimeMillis() + retryTimeoutMillis;
if (umbilical == null) {
endTimeMillis = connect(endTimeMillis);
}
while (true) {
if (umbilical == null) {
throw new IOException("RecoverableRpcProxy is closed.");
}
try {
return method.invoke(umbilical, args);
} catch (Throwable t) {
// handle RPC failure
while (t instanceof InvocationTargetException || t instanceof UndeclaredThrowableException) {
Throwable cause = t.getCause();
if (cause != null) {
t = cause;
}
}
final long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis < endTimeMillis) {
LOG.warn("RPC failure, will retry after {} ms (remaining {} ms)", retryDelayMillis, endTimeMillis - currentTimeMillis, t);
sleep(retryDelayMillis);
endTimeMillis = connect(endTimeMillis);
} else {
LOG.error("Giving up RPC connection recovery after {} ms", currentTimeMillis - endTimeMillis + retryTimeoutMillis, t);
close();
throw t;
}
}
}
}
@Override
public void close()
{
LOG.debug("Closing RPC connection {}", lastConnectURI);
if (umbilical != null) {
RPC.stopProxy(umbilical);
umbilical = null;
}
}
public static URI toConnectURI(final InetSocketAddress address) throws Exception
{
int rpcTimeoutMillis = Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT);
long retryDelayMillis = Long.getLong(RETRY_DELAY, RETRY_DELAY_DEFAULT);
long retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, RETRY_TIMEOUT_DEFAULT);
return toConnectURI(address, rpcTimeoutMillis, retryDelayMillis, retryTimeoutMillis);
}
public static URI toConnectURI(InetSocketAddress address, int rpcTimeoutMillis, long retryDelayMillis, long retryTimeoutMillis) throws Exception
{
return new URIBuilder()
.setScheme("stram")
.setHost(address.getHostName())
.setPort(address.getPort())
.setParameter(RecoverableRpcProxy.QP_rpcTimeout, Integer.toString(rpcTimeoutMillis))
.setParameter(RecoverableRpcProxy.QP_retryDelayMillis, Long.toString(retryDelayMillis))
.setParameter(RecoverableRpcProxy.QP_retryTimeoutMillis, Long.toString(retryTimeoutMillis))
.build();
}
}