blob: 939dfc2690d43ac4c14f93a2246fdfdeeaf09182 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
*/
package org.apache.catalina.tribes.group.interceptors;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelException.FaultyMember;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.catalina.tribes.membership.Membership;
import java.net.ConnectException;
/**
* <p>Title: A perfect failure detector </p>
*
* <p>Description: The TcpFailureDetector is a useful interceptor
* that adds reliability to the membership layer.</p>
* <p>
* If the network is busy, or the system is busy so that the membership receiver thread
* is not getting enough time to update its table, members can be &quot;timed out&quot;
* This failure detector will intercept the memberDisappeared message(unless its a true shutdown message)
* and connect to the member using TCP.
* </p>
* <p>
* The TcpFailureDetector works in two ways. <br>
* 1. It intercepts memberDisappeared events
* 2. It catches send errors
* </p>
*
* @author Filip Hanik
* @version 1.0
*/
public class TcpFailureDetector extends ChannelInterceptorBase {
private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( TcpFailureDetector.class );
protected static byte[] TCP_FAIL_DETECT = new byte[] {
79, -89, 115, 72, 121, -126, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20,
125, -39, 82, 91, -21, -15, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74,
55, 21, -66, -121, 69, 126, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50,
85, -10, -108, -73, 58, -6, 64, 120, -111, 4, 125, -41, 114, -124, -64, -43};
protected boolean performConnectTest = true;
protected long connectTimeout = 1000;//1 second default
protected boolean performSendTest = true;
protected boolean performReadTest = false;
protected long readTestTimeout = 5000;//5 seconds
protected Membership membership = null;
protected HashMap removeSuspects = new HashMap();
protected HashMap addSuspects = new HashMap();
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
try {
super.sendMessage(destination, msg, payload);
}catch ( ChannelException cx ) {
FaultyMember[] mbrs = cx.getFaultyMembers();
for ( int i=0; i<mbrs.length; i++ ) {
if ( mbrs[i].getCause()!=null &&
(!(mbrs[i].getCause() instanceof RemoteProcessException)) ) {//RemoteProcessException's are ok
this.memberDisappeared(mbrs[i].getMember());
}//end if
}//for
throw cx;
}
}
public void messageReceived(ChannelMessage msg) {
//catch incoming
boolean process = true;
if ( okToProcess(msg.getOptions()) ) {
//check to see if it is a testMessage, if so, process = false
process = ( (msg.getMessage().getLength() != TCP_FAIL_DETECT.length) ||
(!Arrays.equals(TCP_FAIL_DETECT,msg.getMessage().getBytes()) ) );
}//end if
//ignore the message, it doesnt have the flag set
if ( process ) super.messageReceived(msg);
else if ( log.isDebugEnabled() ) log.debug("Received a failure detector packet:"+msg);
}//messageReceived
public void memberAdded(Member member) {
if ( membership == null ) setupMembership();
boolean notify = false;
synchronized (membership) {
if (removeSuspects.containsKey(member)) {
//previously marked suspect, system below picked up the member again
removeSuspects.remove(member);
} else if (membership.getMember( (MemberImpl) member) == null){
//if we add it here, then add it upwards too
//check to see if it is alive
if (memberAlive(member)) {
membership.memberAlive( (MemberImpl) member);
notify = true;
} else {
addSuspects.put(member, new Long(System.currentTimeMillis()));
}
}
}
if ( notify ) super.memberAdded(member);
}
public void memberDisappeared(Member member) {
if ( membership == null ) setupMembership();
boolean notify = false;
boolean shutdown = Arrays.equals(member.getCommand(),Member.SHUTDOWN_PAYLOAD);
if ( !shutdown ) log.info("Received memberDisappeared["+member+"] message. Will verify.");
synchronized (membership) {
//check to see if the member really is gone
//if the payload is not a shutdown message
if (shutdown || !memberAlive(member)) {
//not correct, we need to maintain the map
membership.removeMember( (MemberImpl) member);
removeSuspects.remove(member);
notify = true;
} else {
//add the member as suspect
removeSuspects.put(member, new Long(System.currentTimeMillis()));
}
}
if ( notify ) {
log.info("Verification complete. Member disappeared["+member+"]");
super.memberDisappeared(member);
} else {
log.info("Verification complete. Member still alive["+member+"]");
}
}
public boolean hasMembers() {
if ( membership == null ) setupMembership();
return membership.hasMembers();
}
public Member[] getMembers() {
if ( membership == null ) setupMembership();
return membership.getMembers();
}
public Member getMember(Member mbr) {
if ( membership == null ) setupMembership();
return membership.getMember(mbr);
}
public Member getLocalMember(boolean incAlive) {
return super.getLocalMember(incAlive);
}
public void heartbeat() {
try {
if (membership == null) setupMembership();
synchronized (membership) {
//update all alive times
Member[] members = super.getMembers();
for (int i = 0; members != null && i < members.length; i++) {
if (membership.memberAlive( (MemberImpl) members[i])) {
//we don't have this one in our membership, check to see if he/she is alive
if (memberAlive(members[i])) {
log.warn("Member added, even though we werent notified:" + members[i]);
super.memberAdded(members[i]);
} else {
membership.removeMember( (MemberImpl) members[i]);
} //end if
} //end if
} //for
//check suspect members if they are still alive,
//if not, simply issue the memberDisappeared message
MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
for (int i = 0; i < keys.length; i++) {
MemberImpl m = (MemberImpl) keys[i];
if (membership.getMember(m) != null && (!memberAlive(m))) {
membership.removeMember(m);
super.memberDisappeared(m);
removeSuspects.remove(m);
log.info("Suspect member, confirmed dead.["+m+"]");
} //end if
}
//check add suspects members if they are alive now,
//if they are, simply issue the memberAdded message
keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
for (int i = 0; i < keys.length; i++) {
MemberImpl m = (MemberImpl) keys[i];
if ( membership.getMember(m) == null && (memberAlive(m))) {
membership.memberAlive(m);
super.memberAdded(m);
addSuspects.remove(m);
log.info("Suspect member, confirmed alive.["+m+"]");
} //end if
}
}
}catch ( Exception x ) {
log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x);
} finally {
super.heartbeat();
}
}
protected synchronized void setupMembership() {
if ( membership == null ) {
membership = new Membership((MemberImpl)super.getLocalMember(true));
}
}
protected boolean memberAlive(Member mbr) {
return memberAlive(mbr,TCP_FAIL_DETECT,performSendTest,performReadTest,readTestTimeout,connectTimeout,getOptionFlag());
}
protected static boolean memberAlive(Member mbr, byte[] msgData,
boolean sendTest, boolean readTest,
long readTimeout, long conTimeout,
int optionFlag) {
//could be a shutdown notification
if ( Arrays.equals(mbr.getCommand(),Member.SHUTDOWN_PAYLOAD) ) return false;
Socket socket = new Socket();
try {
InetAddress ia = InetAddress.getByAddress(mbr.getHost());
InetSocketAddress addr = new InetSocketAddress(ia, mbr.getPort());
socket.setSoTimeout((int)readTimeout);
socket.connect(addr, (int) conTimeout);
if ( sendTest ) {
ChannelData data = new ChannelData(true);
data.setAddress(mbr);
data.setMessage(new XByteBuffer(msgData,false));
data.setTimestamp(System.currentTimeMillis());
int options = optionFlag | Channel.SEND_OPTIONS_BYTE_MESSAGE;
if ( readTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK);
else options = (options & (~Channel.SEND_OPTIONS_USE_ACK));
data.setOptions(options);
byte[] message = XByteBuffer.createDataPackage(data);
socket.getOutputStream().write(message);
if ( readTest ) {
int length = socket.getInputStream().read(message);
return length > 0;
}
}//end if
return true;
} catch ( SocketTimeoutException sx) {
//do nothing, we couldn't connect
} catch ( ConnectException cx) {
//do nothing, we couldn't connect
}catch (Exception x ) {
log.error("Unable to perform failure detection check, assuming member down.",x);
} finally {
try {socket.close(); } catch ( Exception ignore ){}
}
return false;
}
}