/*                                                                             
 * Copyright 2004,2005 The Apache Software Foundation.                         
 *                                                                             
 * Licensed under the Apache License, Version 2.0 (the "License");             
 * you may not use this file except in compliance with the License.            
 * You may obtain a copy of the License at                                     
 *                                                                             
 *      http://www.apache.org/licenses/LICENSE-2.0                             
 *                                                                             
 * Unless required by applicable law or agreed to in writing, software         
 * distributed under the License is distributed on an "AS IS" BASIS,           
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
 * See the License for the specific language governing permissions and         
 * limitations under the License.                                              
 */
package org.apache.axis2.clustering.tribes;

import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.Member;
import org.apache.axis2.clustering.MembershipScheme;
import org.apache.axis2.clustering.MembershipListener;
import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
import org.apache.axis2.clustering.control.wka.MemberListCommand;
import org.apache.axis2.clustering.control.wka.RpcMembershipRequestHandler;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.util.Utils;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.group.Response;
import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
import org.apache.catalina.tribes.group.interceptors.TcpPingInterceptor;
import org.apache.catalina.tribes.membership.StaticMember;
import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * Implementation of the WKA(well-known address) based membership scheme. In this scheme,
 * membership is discovered using a few well-known members (who run at well-known IP addresses)
 */
public class WkaBasedMembershipScheme implements MembershipScheme {

    private static final Log log = LogFactory.getLog(WkaBasedMembershipScheme.class);

    /**
     * The Tribes channel
     */
    private ManagedChannel channel;
    private MembershipManager primaryMembershipManager;
    private List<MembershipManager> applicationDomainMembershipManagers;
    private StaticMembershipInterceptor staticMembershipInterceptor;
    private Map<String, Parameter> parameters;

    /**
     * The loadBalancerDomain to which the members belong to
     */
    private byte[] localDomain;

    /**
     * The static(well-known) members
     */
    private List<Member> members;

    /**
     * The mode in which this member operates such as "loadBalance" or "application"
     */
    private OperationMode mode;

    private boolean atmostOnceMessageSemantics;
    private boolean preserverMsgOrder;

    public WkaBasedMembershipScheme(ManagedChannel channel,
                                    OperationMode mode,
                                    List<MembershipManager> applicationDomainMembershipManagers,
                                    MembershipManager primaryMembershipManager,
                                    Map<String, Parameter> parameters,
                                    byte[] domain,
                                    List<Member> members,
                                    boolean atmostOnceMessageSemantics,
                                    boolean preserverMsgOrder) {
        this.channel = channel;
        this.mode = mode;
        this.applicationDomainMembershipManagers = applicationDomainMembershipManagers;
        this.primaryMembershipManager = primaryMembershipManager;
        this.parameters = parameters;
        this.localDomain = domain;
        this.members = members;
        this.atmostOnceMessageSemantics = atmostOnceMessageSemantics;
        this.preserverMsgOrder = preserverMsgOrder;
    }

    /**
     * Configure the membership related to the WKA based scheme
     *
     * @throws org.apache.axis2.clustering.ClusteringFault
     *          If an error occurs while configuring this scheme
     */
    public void init() throws ClusteringFault {
        addInterceptors();
        configureStaticMembership();
    }

    private void configureStaticMembership() throws ClusteringFault {
        channel.setMembershipService(new WkaMembershipService(primaryMembershipManager));
        StaticMember localMember = new StaticMember();
        primaryMembershipManager.setLocalMember(localMember);
        ReceiverBase receiver = (ReceiverBase) channel.getChannelReceiver();

        // ------------ START: Configure and add the local member ---------------------
        Parameter localHost = getParameter(TribesConstants.LOCAL_MEMBER_HOST);
        String host;
        if (localHost != null) {
            host = ((String) localHost.getValue()).trim();
        } else { // In cases where the localhost needs to be automatically figured out
            try {
                try {
                    host = Utils.getIpAddress();
                } catch (SocketException e) {
                    String msg = "Could not get local IP address";
                    log.error(msg, e);
                    throw new ClusteringFault(msg, e);
                }
            } catch (Exception e) {
                String msg = "Could not get the localhost name";
                log.error(msg, e);
                throw new ClusteringFault(msg, e);
            }
        }
        receiver.setAddress(host);
        try {
            localMember.setHostname(host);
        } catch (IOException e) {
            String msg = "Could not set the local member's name";
            log.error(msg, e);
            throw new ClusteringFault(msg, e);
        }

        Parameter localPort = getParameter(TribesConstants.LOCAL_MEMBER_PORT);
        int port;
        try {
            if (localPort != null) {
                port = Integer.parseInt(((String) localPort.getValue()).trim());
                port = getLocalPort(new ServerSocket(), localMember.getHostname(), port, 4000, 100);
            } else { // In cases where the localport needs to be automatically figured out
                port = getLocalPort(new ServerSocket(), localMember.getHostname(), -1, 4000, 100);
            }
        } catch (IOException e) {
            String msg =
                    "Could not allocate the specified port or a port in the range 4000-4100 " +
                    "for local host " + localMember.getHostname() +
                    ". Check whether the IP address specified or inferred for the local " +
                    "member is correct.";
            log.error(msg, e);
            throw new ClusteringFault(msg, e);
        }

        byte[] payload = "ping".getBytes();
        localMember.setPayload(payload);
        receiver.setPort(port);
        localMember.setPort(port);
        localMember.setDomain(localDomain);
        staticMembershipInterceptor.setLocalMember(localMember);

        // ------------ END: Configure and add the local member ---------------------

        // ------------ START: Add other members ---------------------
        for (Member member : members) {
            StaticMember tribesMember;
            try {
                tribesMember = new StaticMember(member.getHostName(), member.getPort(),
                                                0, payload);
            } catch (IOException e) {
                String msg = "Could not add static member " +
                             member.getHostName() + ":" + member.getPort();
                log.error(msg, e);
                throw new ClusteringFault(msg, e);
            }

            // Do not add the local member to the list of members
            if (!(Arrays.equals(localMember.getHost(), tribesMember.getHost()) &&
                  localMember.getPort() == tribesMember.getPort())) {
                tribesMember.setDomain(localDomain);

                // We will add the member even if it is offline at this moment. When the
                // member comes online, it will be detected by the GMS
                staticMembershipInterceptor.addStaticMember(tribesMember);
                primaryMembershipManager.addWellKnownMember(tribesMember);
                if (canConnect(member)) {
                    primaryMembershipManager.memberAdded(tribesMember);
                    log.info("Added static member " + TribesUtil.getName(tribesMember));
                } else {
                    log.info("Could not connect to member " + TribesUtil.getName(tribesMember));
                }
            }
        }
    }

    /**
     * Before adding a static member, we will try to verify whether we can connect to it
     *
     * @param member The member whose connectvity needs to be verified
     * @return true, if the member can be contacted; false, otherwise.
     */
    private boolean canConnect(org.apache.axis2.clustering.Member member) {
        for (int retries = 5; retries > 0; retries--) {
            try {
                InetAddress addr = InetAddress.getByName(member.getHostName());
                SocketAddress sockaddr = new InetSocketAddress(addr,
                                                               member.getPort());
                new Socket().connect(sockaddr, 500);
                return true;
            } catch (IOException e) {
                String msg = e.getMessage();
                if (msg.indexOf("Connection refused") == -1 && msg.indexOf("connect timed out") == -1) {
                    log.error("Cannot connect to member " +
                              member.getHostName() + ":" + member.getPort(), e);
                }
            }
        }
        return false;
    }

    protected int getLocalPort(ServerSocket socket, String hostname,
                               int preferredPort, int portstart, int retries) throws IOException {
        if (preferredPort != -1) {
            try {
                return getLocalPort(socket, hostname, preferredPort);
            } catch (IOException ignored) {
                // Fall through and try a default port
            }
        }
        InetSocketAddress addr = null;
        if (retries > 0) {
            try {
                return getLocalPort(socket, hostname, portstart);
            } catch (IOException x) {
                retries--;
                if (retries <= 0) {
                    log.error("Unable to bind server socket to:" + addr + " throwing error.");
                    throw x;
                }
                portstart++;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException ignored) {
                    ignored.printStackTrace();
                }
                getLocalPort(socket, hostname, portstart, retries, -1);
            }
        }
        return portstart;
    }

    private int getLocalPort(ServerSocket socket, String hostname, int port) throws IOException {
        InetSocketAddress addr;
        addr = new InetSocketAddress(hostname, port);
        socket.bind(addr);
        log.info("Receiver Server Socket bound to:" + addr);
        socket.setSoTimeout(5);
        socket.close();
        try {
            Thread.sleep(100);
        } catch (InterruptedException ignored) {
            ignored.printStackTrace();
        }
        return port;
    }

    /**
     * Add ChannelInterceptors. The order of the interceptors that are added will depend on the
     * membership management scheme
     */
    private void addInterceptors() {

        if (log.isDebugEnabled()) {
            log.debug("Adding Interceptors...");
        }
        TcpPingInterceptor tcpPingInterceptor = new TcpPingInterceptor();
        tcpPingInterceptor.setInterval(100);
        channel.addInterceptor(tcpPingInterceptor);
        if (log.isDebugEnabled()) {
            log.debug("Added TCP Ping Interceptor");
        }

        // Add a reliable failure detector
        TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
//        tcpFailureDetector.setPrevious(dfi); //TODO: check this
        tcpFailureDetector.setReadTestTimeout(120000);
        tcpFailureDetector.setConnectTimeout(60000);
        channel.addInterceptor(tcpFailureDetector);
        if (log.isDebugEnabled()) {
            log.debug("Added TCP Failure Detector");
        }

        // Add the NonBlockingCoordinator.
//        channel.addInterceptor(new Axis2Coordinator(membershipListener));

        staticMembershipInterceptor = new StaticMembershipInterceptor();
        staticMembershipInterceptor.setLocalMember(primaryMembershipManager.getLocalMember());
        primaryMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
        channel.addInterceptor(staticMembershipInterceptor);
        if (log.isDebugEnabled()) {
            log.debug("Added Static Membership Interceptor");
        }

        channel.getMembershipService().setDomain(localDomain);
        mode.addInterceptors(channel);

        if (atmostOnceMessageSemantics) {
            // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
            AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
            atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
            channel.addInterceptor(atMostOnceInterceptor);
            if (log.isDebugEnabled()) {
                log.debug("Added At-most-once Interceptor");
            }
        }

        if (preserverMsgOrder) {
            // Add the OrderInterceptor to preserve sender ordering
            OrderInterceptor orderInterceptor = new OrderInterceptor();
            orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
            channel.addInterceptor(orderInterceptor);
            if (log.isDebugEnabled()) {
                log.debug("Added Message Order Interceptor");
            }
        }
    }

    /**
     * JOIN the group and get the member list
     *
     * @throws ClusteringFault If an error occurs while joining the group
     */
    public void joinGroup() throws ClusteringFault {

        // Have multiple RPC channels with multiple RPC request handlers for each localDomain
        // This is needed only when this member is running as a load balancer
        for (MembershipManager appDomainMembershipManager : applicationDomainMembershipManagers) {
            appDomainMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);

            // Create an RpcChannel for each localDomain
            String domain = new String(appDomainMembershipManager.getDomain());
            RpcChannel rpcMembershipChannel =
                    new RpcChannel(TribesUtil.getRpcMembershipChannelId(appDomainMembershipManager.getDomain()),
                                   channel,
                                   new RpcMembershipRequestHandler(appDomainMembershipManager,
                                                                   this));
            appDomainMembershipManager.setRpcMembershipChannel(rpcMembershipChannel);
            if (log.isDebugEnabled()) {
                log.debug("Created RPC Membership Channel for application domain " + domain);
            }
        }

        // Create a Membership channel for handling membership requests
        RpcChannel rpcMembershipChannel =
                new RpcChannel(TribesUtil.getRpcMembershipChannelId(localDomain),
                               channel, new RpcMembershipRequestHandler(primaryMembershipManager,
                                                                        this));
        if (log.isDebugEnabled()) {
            log.debug("Created primary membership channel " + new String(localDomain));
        }
        primaryMembershipManager.setRpcMembershipChannel(rpcMembershipChannel);

        // Send JOIN message to a WKA member
        if (primaryMembershipManager.getMembers().length > 0) {
            log.info("Sending JOIN message to WKA members...");
            org.apache.catalina.tribes.Member[] wkaMembers = primaryMembershipManager.getMembers(); // The well-known members
            /*try {
                Thread.sleep(3000); // Wait for sometime so that the WKA members can receive the MEMBER_LIST message, if they have just joined the group
            } catch (InterruptedException ignored) {
            }*/   //TODO: #### Need to double check whether this sleep is necessary
            Response[] responses = null;
            do {
                try {
                    responses = rpcMembershipChannel.send(wkaMembers,
                                                          new JoinGroupCommand(),
                                                          RpcChannel.ALL_REPLY,
                                                          Channel.SEND_OPTIONS_ASYNCHRONOUS |
                                                          TribesConstants.MEMBERSHIP_MSG_OPTION,
                                                          10000);
                    if (responses.length == 0) {
                        try {
                            if (log.isDebugEnabled()) {
                                log.debug("No responses received");
                            }
                            Thread.sleep(500);
                        } catch (InterruptedException ignored) {
                        }
                    }
                } catch (Exception e) {
                    String msg = "Error occurred while trying to send JOIN request to WKA members";
                    log.error(msg, e);
                    wkaMembers = primaryMembershipManager.getMembers();
                    if (wkaMembers.length == 0) {
                        log.warn("There are no well-known members");
                        break;
                    }
                }

                // TODO: If we do not get a response within some time, try to recover from this fault
            }
            while (responses == null || responses.length == 0);  // Wait until we've received at least one response

            for (Response response : responses) {
                MemberListCommand command = (MemberListCommand) response.getMessage();
                command.setMembershipManager(primaryMembershipManager);
                command.execute(null); // Set the list of current members

                // If the WKA member is not part of this group, remove it
                if (!TribesUtil.areInSameDomain(response.getSource(),
                                                primaryMembershipManager.getLocalMember())) {
                    primaryMembershipManager.memberDisappeared(response.getSource());
                    if (log.isDebugEnabled()) {
                        log.debug("Removed member " + TribesUtil.getName(response.getSource()) +
                                  " since it does not belong to the local domain " +
                                  new String(primaryMembershipManager.getLocalMember().getDomain()));
                    }
                }
            }
        }
    }

    /**
     * When a JOIN message is received from some other member, it is notified using this method,
     * so that membership scheme specific processing can be carried out
     *
     * @param member The member who just joined
     */
    public void processJoin(org.apache.catalina.tribes.Member member) {
        mode.notifyMemberJoin(member);
    }


    public Parameter getParameter(String name) {
        return parameters.get(name);
    }
}
