/*
 * Copyright 1999,2004 The Apache Software Foundation.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 */
package org.apache.catalina.tribes.group.interceptors;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.HashMap;

import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelException.FaultyMember;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.catalina.tribes.membership.Membership;
import java.net.ConnectException;

/**
 * <p>Title: A perfect failure detector </p>
 *
 * <p>Description: The TcpFailureDetector is a useful interceptor
 * that adds reliability to the membership layer.</p>
 * <p>
 * If the network is busy, or the system is busy so that the membership receiver thread
 * is not getting enough time to update its table, members can be &quot;timed out&quot;
 * This failure detector will intercept the memberDisappeared message(unless its a true shutdown message)
 * and connect to the member using TCP.
 * </p>
 * <p>
 * The TcpFailureDetector works in two ways. <br>
 * 1. It intercepts memberDisappeared events
 * 2. It catches send errors 
 * </p>
 *
 * @author Filip Hanik
 * @version 1.0
 */
public class TcpFailureDetector extends ChannelInterceptorBase {
    
    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( TcpFailureDetector.class );
    
    protected static byte[] TCP_FAIL_DETECT = new byte[] {
        79, -89, 115, 72, 121, -126, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20,
        125, -39, 82, 91, -21, -15, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74,
        55, 21, -66, -121, 69, 126, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50,
        85, -10, -108, -73, 58, -6, 64, 120, -111, 4, 125, -41, 114, -124, -64, -43};      
    
    protected boolean performConnectTest = true;

    protected long connectTimeout = 1000;//1 second default
    
    protected boolean performSendTest = true;

    protected boolean performReadTest = false;
    
    protected long readTestTimeout = 5000;//5 seconds
    
    protected Membership membership = null;
    
    protected HashMap removeSuspects = new HashMap();
    
    protected HashMap addSuspects = new HashMap();
    
    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
        try {
            super.sendMessage(destination, msg, payload);
        }catch ( ChannelException cx ) {
            FaultyMember[] mbrs = cx.getFaultyMembers();
            for ( int i=0; i<mbrs.length; i++ ) {
                if ( mbrs[i].getCause()!=null &&  
                     (!(mbrs[i].getCause() instanceof RemoteProcessException)) ) {//RemoteProcessException's are ok
                    this.memberDisappeared(mbrs[i].getMember());
                }//end if
            }//for
            throw cx;
        }
    }

    public void messageReceived(ChannelMessage msg) {
        //catch incoming 
        boolean process = true;
        if ( okToProcess(msg.getOptions()) ) {
            //check to see if it is a testMessage, if so, process = false
            process = ( (msg.getMessage().getLength() != TCP_FAIL_DETECT.length) ||
                        (!Arrays.equals(TCP_FAIL_DETECT,msg.getMessage().getBytes()) ) );
        }//end if
            
        //ignore the message, it doesnt have the flag set
        if ( process ) super.messageReceived(msg);
        else if ( log.isDebugEnabled() ) log.debug("Received a failure detector packet:"+msg);
    }//messageReceived
    
    
    public void memberAdded(Member member) {
        if ( membership == null ) setupMembership();
        boolean notify = false;
        synchronized (membership) {
            if (removeSuspects.containsKey(member)) {
                //previously marked suspect, system below picked up the member again
                removeSuspects.remove(member);
            } else if (membership.getMember( (MemberImpl) member) == null){
                //if we add it here, then add it upwards too
                //check to see if it is alive
                if (memberAlive(member)) {
                    membership.memberAlive( (MemberImpl) member);
                    notify = true;
                } else {
                    addSuspects.put(member, new Long(System.currentTimeMillis()));
                }
            }
        }
        if ( notify ) super.memberAdded(member);
    }

    public void memberDisappeared(Member member) {
        if ( membership == null ) setupMembership();
        boolean notify = false;
        boolean shutdown = Arrays.equals(member.getCommand(),Member.SHUTDOWN_PAYLOAD);
        if ( !shutdown ) log.info("Received memberDisappeared["+member+"] message. Will verify.");
        synchronized (membership) {
            //check to see if the member really is gone
            //if the payload is not a shutdown message
            if (shutdown || !memberAlive(member)) {
                //not correct, we need to maintain the map
                membership.removeMember( (MemberImpl) member);
                removeSuspects.remove(member);
                notify = true;
            } else {
                //add the member as suspect
                removeSuspects.put(member, new Long(System.currentTimeMillis()));
            }
        }
        if ( notify ) {
            log.info("Verification complete. Member disappeared["+member+"]");
            super.memberDisappeared(member);
        } else {
            log.info("Verification complete. Member still alive["+member+"]");

        }
    }
    
    public boolean hasMembers() {
        if ( membership == null ) setupMembership();
        return membership.hasMembers();
    }

    public Member[] getMembers() {
        if ( membership == null ) setupMembership();
        return membership.getMembers();
    }

    public Member getMember(Member mbr) {
        if ( membership == null ) setupMembership();
        return membership.getMember(mbr);
    }

    public Member getLocalMember(boolean incAlive) {
        return super.getLocalMember(incAlive);
    }
    
    public void heartbeat() {
        try {
            if (membership == null) setupMembership();
            synchronized (membership) {
                //update all alive times
                Member[] members = super.getMembers();
                for (int i = 0; members != null && i < members.length; i++) {
                    if (membership.memberAlive( (MemberImpl) members[i])) {
                        //we don't have this one in our membership, check to see if he/she is alive
                        if (memberAlive(members[i])) {
                            log.warn("Member added, even though we werent notified:" + members[i]);
                            super.memberAdded(members[i]);
                        } else {
                            membership.removeMember( (MemberImpl) members[i]);
                        } //end if
                    } //end if
                } //for

                //check suspect members if they are still alive,
                //if not, simply issue the memberDisappeared message
                MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
                for (int i = 0; i < keys.length; i++) {
                    MemberImpl m = (MemberImpl) keys[i];
                    if (membership.getMember(m) != null && (!memberAlive(m))) {
                        membership.removeMember(m);
                        super.memberDisappeared(m);
                        removeSuspects.remove(m);
                        log.info("Suspect member, confirmed dead.["+m+"]");
                    } //end if
                }

                //check add suspects members if they are alive now,
                //if they are, simply issue the memberAdded message
                keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
                for (int i = 0; i < keys.length; i++) {
                    MemberImpl m = (MemberImpl) keys[i];
                    if ( membership.getMember(m) == null && (memberAlive(m))) {
                        membership.memberAlive(m);
                        super.memberAdded(m);
                        addSuspects.remove(m);
                        log.info("Suspect member, confirmed alive.["+m+"]");
                    } //end if
                }
            }
        }catch ( Exception x ) {
            log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x);
        } finally {
            super.heartbeat();
        }
    }
    
    protected synchronized void setupMembership() {
        if ( membership == null ) {
            membership = new Membership((MemberImpl)super.getLocalMember(true));
        }
        
    }
    
    protected boolean memberAlive(Member mbr) {
        return memberAlive(mbr,TCP_FAIL_DETECT,performSendTest,performReadTest,readTestTimeout,connectTimeout,getOptionFlag());
    }
    
    protected static boolean memberAlive(Member mbr, byte[] msgData, 
                                         boolean sendTest, boolean readTest,
                                         long readTimeout, long conTimeout,
                                         int optionFlag) {
        //could be a shutdown notification
        if ( Arrays.equals(mbr.getCommand(),Member.SHUTDOWN_PAYLOAD) ) return false;
        
        Socket socket = new Socket();        
        try {
            InetAddress ia = InetAddress.getByAddress(mbr.getHost());
            InetSocketAddress addr = new InetSocketAddress(ia, mbr.getPort());
            socket.setSoTimeout((int)readTimeout);
            socket.connect(addr, (int) conTimeout);
            if ( sendTest ) {
                ChannelData data = new ChannelData(true);
                data.setAddress(mbr);
                data.setMessage(new XByteBuffer(msgData,false));
                data.setTimestamp(System.currentTimeMillis());
                int options = optionFlag | Channel.SEND_OPTIONS_BYTE_MESSAGE;
                if ( readTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK);
                else options = (options & (~Channel.SEND_OPTIONS_USE_ACK));
                data.setOptions(options);
                byte[] message = XByteBuffer.createDataPackage(data);
                socket.getOutputStream().write(message);
                if ( readTest ) {
                    int length = socket.getInputStream().read(message);
                    return length > 0;
                }
            }//end if
            return true;
        } catch ( SocketTimeoutException sx) {
            //do nothing, we couldn't connect
        } catch ( ConnectException cx) {
            //do nothing, we couldn't connect
        }catch (Exception x ) {
            log.error("Unable to perform failure detection check, assuming member down.",x);
        } finally {
            try {socket.close(); } catch ( Exception ignore ){}
        }
        return false;
    }



    
}