package org.apache.commons.jcs3.utils.discovery;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.jcs3.engine.CacheInfo;
import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
import org.apache.commons.jcs3.io.ObjectInputStreamClassLoaderAware;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
import org.apache.commons.jcs3.utils.net.HostNameUtil;
import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration;
import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;

/** Receives UDP Discovery messages. */
public class UDPDiscoveryReceiver
    implements Runnable, IShutdownObserver
{
    /** The log factory */
    private static final Log log = LogManager.getLog( UDPDiscoveryReceiver.class );

    /** buffer */
    private final byte[] mBuffer = new byte[65536];

    /** The socket used for communication. */
    private MulticastSocket mSocket;

    /**
     * TODO: Consider using the threadpool manager to get this thread pool. For now place a tight
     * restriction on the pool size
     */
    private static final int maxPoolSize = 2;

    /** The processor */
    private final ExecutorService pooledExecutor;

    /** number of messages received. For debugging and testing. */
    private final AtomicInteger cnt = new AtomicInteger(0);

    /** Service to get cache names and handle request broadcasts */
    private final UDPDiscoveryService service;

    /** Multicast address */
    private final InetAddress multicastAddress;

    /** Is it shutdown. */
    private AtomicBoolean shutdown = new AtomicBoolean(false);

    /**
     * Constructor for the LateralUDPReceiver object.
     * <p>
     * We determine out own host using InetAddress
     *<p>
     * @param service
     * @param multicastInterfaceString
     * @param multicastAddressString
     * @param multicastPort
     * @throws IOException
     */
    public UDPDiscoveryReceiver( final UDPDiscoveryService service, final String multicastInterfaceString,
            final String multicastAddressString, final int multicastPort )
        throws IOException
    {
        this.service = service;
        this.multicastAddress = InetAddress.getByName( multicastAddressString );

        // create a small thread pool to handle a barrage
        this.pooledExecutor = ThreadPoolManager.getInstance().createPool(
        		new PoolConfiguration(false, 0, maxPoolSize, maxPoolSize, 0,
        		        WhenBlockedPolicy.DISCARDOLDEST, maxPoolSize),
        		"JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY);

        log.info( "Constructing listener, [{0}:{1}]", multicastAddress, multicastPort );

        createSocket( multicastInterfaceString, multicastAddress, multicastPort );
    }

    /**
     * Creates the socket for this class.
     * <p>
     * @param multicastInterfaceString
     * @param multicastAddress
     * @param multicastPort
     * @throws IOException
     */
    private void createSocket( final String multicastInterfaceString, final InetAddress multicastAddress,
            final int multicastPort )
        throws IOException
    {
        try
        {
            mSocket = new MulticastSocket( multicastPort );
            if (log.isInfoEnabled())
            {
                log.info( "Joining Group: [{0}]", multicastAddress );
            }

            // Use dedicated interface if specified
            NetworkInterface multicastInterface = null;
            if (multicastInterfaceString != null)
            {
                multicastInterface = NetworkInterface.getByName(multicastInterfaceString);
            }
            else
            {
                multicastInterface = HostNameUtil.getMulticastNetworkInterface();
            }
            if (multicastInterface != null)
            {
                log.info("Using network interface {0}", multicastInterface.getDisplayName());
                mSocket.setNetworkInterface(multicastInterface);
            }

            mSocket.joinGroup( multicastAddress );
        }
        catch ( final IOException e )
        {
            log.error( "Could not bind to multicast address [{0}:{1}]", multicastAddress,
                    multicastPort, e );
            throw e;
        }
    }

    /**
     * Highly unreliable. If it is processing one message while another comes in, the second
     * message is lost. This is for low concurrency peppering.
     * <p>
     * @return the object message
     * @throws IOException
     */
    public Object waitForMessage()
        throws IOException
    {
        final DatagramPacket packet = new DatagramPacket( mBuffer, mBuffer.length );
        Object obj = null;
        try
        {
            log.debug( "Waiting for message." );

            mSocket.receive( packet );

            log.debug( "Received packet from address [{0}]",
                    () -> packet.getSocketAddress() );

            try (ByteArrayInputStream byteStream = new ByteArrayInputStream(mBuffer, 0, packet.getLength());
                 ObjectInputStream objectStream = new ObjectInputStreamClassLoaderAware(byteStream, null))
            {
                obj = objectStream.readObject();
            }

            if ( obj instanceof UDPDiscoveryMessage )
            {
            	// Ensure that the address we're supposed to send to is, indeed, the address
            	// of the machine on the other end of this connection.  This guards against
            	// instances where we don't exactly get the right local host address
            	final UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj;
            	msg.setHost(packet.getAddress().getHostAddress());

                log.debug( "Read object from address [{0}], object=[{1}]",
                        packet.getSocketAddress(), obj );
            }
        }
        catch ( final Exception e )
        {
            log.error( "Error receiving multicast packet", e );
        }

        return obj;
    }

    /** Main processing method for the LateralUDPReceiver object */
    @Override
    public void run()
    {
        try
        {
            while (!shutdown.get())
            {
                final Object obj = waitForMessage();

                cnt.incrementAndGet();

                log.debug( "{0} messages received.", this::getCnt );

                try
                {
                    UDPDiscoveryMessage message = (UDPDiscoveryMessage) obj;
                    // check for null
                    if ( message != null )
                    {
                        pooledExecutor.execute(() -> handleMessage(message));
                        log.debug( "Passed handler to executor." );
                    }
                    else
                    {
                        log.warn( "message is null" );
                    }
                }
                catch ( final ClassCastException cce )
                {
                    log.warn( "Received unknown message type", cce.getMessage() );
                }
            } // end while
        }
        catch ( final IOException e )
        {
            log.error( "Unexpected exception in UDP receiver.", e );
            try
            {
                Thread.sleep( 100 );
                // TODO consider some failure count so we don't do this
                // forever.
            }
            catch ( final InterruptedException e2 )
            {
                log.error( "Problem sleeping", e2 );
            }
        }
    }

    /**
     * @param cnt The cnt to set.
     */
    public void setCnt( final int cnt )
    {
        this.cnt.set(cnt);
    }

    /**
     * @return Returns the cnt.
     */
    public int getCnt()
    {
        return cnt.get();
    }

    /**
     * Separate thread run when a command comes into the UDPDiscoveryReceiver.
     * @deprectaed No longer used
     */
    @Deprecated
    public class MessageHandler
        implements Runnable
    {
        /** The message to handle. Passed in during construction. */
        private final UDPDiscoveryMessage message;

        /**
         * @param message
         */
        public MessageHandler( final UDPDiscoveryMessage message )
        {
            this.message = message;
        }

        /**
         * Process the message.
         */
        @Override
        public void run()
        {
            handleMessage(message);
        }

    }

    /**
     * Separate thread run when a command comes into the UDPDiscoveryReceiver.
     */
    private void handleMessage(UDPDiscoveryMessage message)
    {
        // consider comparing ports here instead.
        if ( message.getRequesterId() == CacheInfo.listenerId )
        {
            log.debug( "Ignoring message sent from self" );
        }
        else
        {
            log.debug( "Process message sent from another" );
            log.debug( "Message = {0}", message );

            if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() )
            {
                log.debug( "Ignoring invalid message: {0}", message );
            }
            else
            {
                processMessage(message);
            }
        }
    }

    /**
     * Process the incoming message.
     */
    private void processMessage(UDPDiscoveryMessage message)
    {
        final DiscoveredService discoveredService = new DiscoveredService(message);

        switch (message.getMessageType())
        {
            case REMOVE:
                log.debug( "Removing service from set {0}", discoveredService );
                service.removeDiscoveredService( discoveredService );
                break;
            case REQUEST:
                // if this is a request message, have the service handle it and
                // return
                log.debug( "Message is a Request Broadcast, will have the service handle it." );
                service.serviceRequestBroadcast();
                break;
            case PASSIVE:
            default:
                service.addOrUpdateService( discoveredService );
                break;
        }
    }

    /** Shuts down the socket. */
    @Override
    public void shutdown()
    {
        if (shutdown.compareAndSet(false, true))
        {
            try
            {
                mSocket.leaveGroup( multicastAddress );
                mSocket.close();
                pooledExecutor.shutdownNow();
            }
            catch ( final IOException e )
            {
                log.error( "Problem closing socket" );
            }
        }
    }
}
