package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;

import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor;
import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
import org.apache.commons.jcs3.io.ObjectInputStreamClassLoaderAware;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;

/**
 * This class is based on the log4j SocketAppender class. I'm using a different repair structure, so
 * it is significantly different.
 */
public class LateralTCPSender
{
    /** The logger */
    private static final Log log = LogManager.getLog( LateralTCPSender.class );

    /** Config */
    private final int socketOpenTimeOut;
    private final int socketSoTimeOut;

    /** The stream from the server connection. */
    private ObjectOutputStream oos;

    /** The socket connection with the server. */
    private Socket socket;

    /** how many messages sent */
    private int sendCnt = 0;

    /** Use to synchronize multiple threads that may be trying to get. */
    private final Object getLock = new int[0];

    /**
     * Constructor for the LateralTCPSender object.
     * <p>
     * @param lca
     * @throws IOException
     */
    public LateralTCPSender( ITCPLateralCacheAttributes lca )
        throws IOException
    {
        this.socketOpenTimeOut = lca.getOpenTimeOut();
        this.socketSoTimeOut = lca.getSocketTimeOut();

        String p1 = lca.getTcpServer();
        if ( p1 == null )
        {
            throw new IOException( "Invalid server (null)" );
        }

        String h2 = p1.substring( 0, p1.indexOf( ":" ) );
        int po = Integer.parseInt( p1.substring( p1.indexOf( ":" ) + 1 ) );
        log.debug( "h2 = {0}, po = {1}", h2, po );

        if ( h2.length() == 0 )
        {
            throw new IOException( "Cannot connect to invalid address [" + h2 + ":" + po + "]" );
        }

        init( h2, po );
    }

    /**
     * Creates a connection to a TCP server.
     * <p>
     * @param host
     * @param port
     * @throws IOException
     */
    protected void init( String host, int port )
        throws IOException
    {
        try
        {
            log.info( "Attempting connection to [{0}]", host );

            // have time out socket open do this for us
            try
            {
                socket = new Socket();
                socket.connect( new InetSocketAddress( host, port ), this.socketOpenTimeOut );
            }
            catch ( IOException ioe )
            {
                if (socket != null)
                {
                    socket.close();
                }

                throw new IOException( "Cannot connect to " + host + ":" + port, ioe );
            }

            socket.setSoTimeout( socketSoTimeOut );
            synchronized ( this )
            {
                oos = new ObjectOutputStream( socket.getOutputStream() );
            }
        }
        catch ( java.net.ConnectException e )
        {
            log.debug( "Remote host [{0}] refused connection.", host );
            throw e;
        }
        catch ( IOException e )
        {
            log.debug( "Could not connect to [{0}]", host, e );
            throw e;
        }
    }

    /**
     * Sends commands to the lateral cache listener.
     * <p>
     * @param led
     * @throws IOException
     */
    public <K, V> void send( LateralElementDescriptor<K, V> led )
        throws IOException
    {
        sendCnt++;
        if ( log.isInfoEnabled() && sendCnt % 100 == 0 )
        {
            log.info( "Send Count (port {0}) = {1}", socket.getPort(), sendCnt );
        }

        log.debug( "sending LateralElementDescriptor" );

        if ( led == null )
        {
            return;
        }

        if ( oos == null )
        {
            throw new IOException( "No remote connection is available for LateralTCPSender." );
        }

        synchronized ( this.getLock )
        {
            oos.writeUnshared( led );
            oos.flush();
        }
    }

    /**
     * Sends commands to the lateral cache listener and gets a response. I'm afraid that we could
     * get into a pretty bad blocking situation here. This needs work. I just wanted to get some
     * form of get working. However, get is not recommended for performance reasons. If you have 10
     * laterals, then you have to make 10 failed gets to find out none of the caches have the item.
     * <p>
     * @param led
     * @return ICacheElement
     * @throws IOException
     */
    public <K, V> Object sendAndReceive( LateralElementDescriptor<K, V> led )
        throws IOException
    {
        if ( led == null )
        {
            return null;
        }

        if ( oos == null )
        {
            throw new IOException( "No remote connection is available for LateralTCPSender." );
        }

        Object response = null;

        // Synchronized to insure that the get requests to server from this
        // sender and the responses are processed in order, else you could
        // return the wrong item from the cache.
        // This is a big block of code. May need to re-think this strategy.
        // This may not be necessary.
        // Normal puts, etc to laterals do not have to be synchronized.
        synchronized ( this.getLock )
        {
            try
            {
                // clean up input stream, nothing should be there yet.
                if ( socket.getInputStream().available() > 0 )
                {
                    socket.getInputStream().read( new byte[socket.getInputStream().available()] );
                }
            }
            catch ( IOException ioe )
            {
                log.error( "Problem cleaning socket before send {0}", socket, ioe );
                throw ioe;
            }

            // write object to listener
            oos.writeUnshared( led );
            oos.flush();

            try (ObjectInputStream ois = new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null ))
            {
                socket.setSoTimeout( socketSoTimeOut );
                response = ois.readObject();
            }
            catch ( IOException ioe )
            {
                String message = "Could not open ObjectInputStream to " + socket +
                    " SoTimeout [" + socket.getSoTimeout() +
                    "] Connected [" + socket.isConnected() + "]";
                log.error( message, ioe );
                throw ioe;
            }
            catch ( Exception e )
            {
                log.error( e );
            }
        }

        return response;
    }

    /**
     * Closes connection used by all LateralTCPSenders for this lateral connection. Dispose request
     * should come into the facade and be sent to all lateral cache services. The lateral cache
     * service will then call this method.
     * <p>
     * @throws IOException
     */
    public void dispose()
        throws IOException
    {
        log.info( "Dispose called" );
        // WILL CLOSE CONNECTION USED BY ALL
        oos.close();
        socket.close();
    }
}
