/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.client.handler;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.List;

import javax.jms.*;
import javax.jms.IllegalStateException;

import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import org.apache.qpid.QpidException;
import org.apache.qpid.client.CommonConnection;
import org.apache.qpid.client.PooledConnectionFactory;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.test.utils.QpidTestCase;

public class PooledConnectionFactoryTest extends QpidTestCase
{
    private interface CommonConnectionCreator
    {
        CommonConnection newConnection(ConnectionURL connectionUrl);
    }

    private CommonConnectionCreator _connectionCreator;

    private PooledConnectionFactory _connectionFactory;

    @Override
    public void setUp() throws Exception
    {
        super.setUp();
        _connectionCreator = mock(CommonConnectionCreator.class);

        _connectionFactory = new PooledConnectionFactory()
        {
            @Override
            protected CommonConnection newConnectionInstance(final ConnectionURL connectionDetails) throws QpidException
            {
                return _connectionCreator.newConnection(connectionDetails);
            }
        };
    }


    public void testConnectionCreatedWithUrlUserAndPassword() throws Exception
    {
        _connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");

        final ArgumentCaptor<ConnectionURL> connectionCaptor = ArgumentCaptor.forClass(ConnectionURL.class);
        final List<CommonConnection> createdConnections = new ArrayList<>();

        when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new Answer<CommonConnection>()
        {
            @Override
            public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
            {
                ConnectionURL url = connectionCaptor.getValue();
                assertEquals("user", url.getUsername());
                assertEquals("pass", url.getPassword());

                final CommonConnection connection = mock(CommonConnection.class);
                when(connection.isClosed()).thenReturn(false);
                createdConnections.add(connection);
                return connection;
            }
        });

        Connection conn = _connectionFactory.createConnection();
        assertNotNull(conn);
        assertEquals(1, createdConnections.size());
        final String connToString = conn.toString();
        assertTrue(connToString.endsWith(createdConnections.get(0).toString()));

        conn.close();

        Connection conn2 = _connectionFactory.createConnection();
        assertNotNull(conn2);
        assertEquals(1, createdConnections.size());
        assertTrue(conn2.toString().endsWith(createdConnections.get(0).toString()));


        try
        {
            conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            fail("Closed connection should not allow sessions to be created");
        }
        catch (IllegalStateException e)
        {
            // pass
        }

        Connection conn3 = _connectionFactory.createConnection();
        assertNotNull(conn3);
        assertEquals(2, createdConnections.size());
        assertTrue(conn3.toString().endsWith(createdConnections.get(1).toString()));
        assertFalse(conn3.toString().equals(conn2.toString()));

        conn2.close();
        Connection conn4 = _connectionFactory.createConnection();
        assertNotNull(conn4);
        assertEquals(2, createdConnections.size());
        assertTrue(conn4.toString().endsWith(createdConnections.get(0).toString()));

    }

    public void testConnectionCreatedPassedUserAndPassword() throws Exception
    {

        _connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");

        final ArgumentCaptor<ConnectionURL> connectionCaptor = ArgumentCaptor.forClass(ConnectionURL.class);
        final List<CommonConnection> createdConnections = new ArrayList<>();
        final List<String> usernames = new ArrayList<>();
        final List<String> passwords = new ArrayList<>();

        when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new Answer<CommonConnection>()
        {
            @Override
            public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
            {
                ConnectionURL url = connectionCaptor.getValue();
                usernames.add(url.getUsername());
                passwords.add(url.getPassword());

                final CommonConnection connection = mock(CommonConnection.class);
                when(connection.isClosed()).thenReturn(false);
                createdConnections.add(connection);
                return connection;
            }
        });

        Connection conn = _connectionFactory.createConnection("user1", "pass1");
        assertNotNull(conn);
        assertEquals(1, createdConnections.size());
        assertEquals("user1", usernames.get(0));
        assertEquals("pass1", passwords.get(0));
        conn.close();
        Connection conn2 = _connectionFactory.createConnection("user2", "pass2");
        assertNotNull(conn2);
        assertEquals(2, createdConnections.size());
        assertEquals("user2", usernames.get(1));
        assertEquals("pass2", passwords.get(1));
        conn2.close();
        Connection conn3 = _connectionFactory.createConnection("user1", "pass1");
        assertNotNull(conn3);
        assertEquals(2, createdConnections.size());
        assertTrue(conn3.toString().endsWith(createdConnections.get(0).toString()));


    }

    public void testMaxPoolSize() throws Exception
    {
        _connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
        _connectionFactory.setMaxPoolSize(3);
        final ArgumentCaptor<ConnectionURL> connectionCaptor = ArgumentCaptor.forClass(ConnectionURL.class);
        final List<CommonConnection> createdConnections = new ArrayList<>();

        when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new Answer<CommonConnection>()
        {
            @Override
            public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
            {
                ConnectionURL url = connectionCaptor.getValue();

                final CommonConnection connection = mock(CommonConnection.class);
                when(connection.isClosed()).thenReturn(false);
                createdConnections.add(connection);
                return connection;
            }
        });

        Connection conn1 = _connectionFactory.createQueueConnection();
        Connection conn2 = _connectionFactory.createQueueConnection();
        Connection conn3 = _connectionFactory.createQueueConnection();
        Connection conn4 = _connectionFactory.createQueueConnection();

        assertEquals(4, createdConnections.size());

        conn1.close();
        conn2.close();
        conn3.close();
        conn4.close();

        Connection conn5 = _connectionFactory.createTopicConnection();
        assertEquals(4, createdConnections.size());
        Connection conn6 = _connectionFactory.createTopicConnection();
        assertEquals(4, createdConnections.size());
        Connection conn7 = _connectionFactory.createTopicConnection();
        assertEquals(4, createdConnections.size());
        Connection conn8 = _connectionFactory.createTopicConnection();
        assertEquals(5, createdConnections.size());

    }

    public void testConnectionTimeout() throws Exception
    {
        _connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
        _connectionFactory.setMaxPoolSize(4);
        _connectionFactory.setConnectionTimeout(100l);
        final ArgumentCaptor<ConnectionURL> connectionCaptor = ArgumentCaptor.forClass(ConnectionURL.class);
        final List<CommonConnection> createdConnections = new ArrayList<>();

        when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new Answer<CommonConnection>()
        {
            @Override
            public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
            {
                ConnectionURL url = connectionCaptor.getValue();

                final CommonConnection connection = mock(CommonConnection.class);
                when(connection.isClosed()).thenReturn(false);
                createdConnections.add(connection);
                return connection;
            }
        });

        Connection conn1 = _connectionFactory.createQueueConnection();
        Connection conn2 = _connectionFactory.createQueueConnection();
        Connection conn3 = _connectionFactory.createQueueConnection();
        Connection conn4 = _connectionFactory.createQueueConnection();

        assertEquals(4, createdConnections.size());

        conn1.close();
        conn2.close();
        conn3.close();
        conn4.close();

        Thread.sleep(500l);

        Connection conn5 = _connectionFactory.createTopicConnection();
        assertEquals(5, createdConnections.size());
        Connection conn6 = _connectionFactory.createTopicConnection();
        assertEquals(6, createdConnections.size());
        Connection conn7 = _connectionFactory.createTopicConnection();
        assertEquals(7, createdConnections.size());
        Connection conn8 = _connectionFactory.createTopicConnection();
        assertEquals(8, createdConnections.size());


    }

    public void testSessionsAreClosed() throws Exception
    {
        _connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");

        final ArgumentCaptor<ConnectionURL> connectionCaptor = ArgumentCaptor.forClass(ConnectionURL.class);
        final List<CommonConnection> createdConnections = new ArrayList<>();
        final Session session = mock(Session.class);
        when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new Answer<CommonConnection>()
        {
            @Override
            public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
            {
                ConnectionURL url = connectionCaptor.getValue();

                final CommonConnection connection = mock(CommonConnection.class);
                when(connection.isClosed()).thenReturn(false);
                when(connection.createSession(anyBoolean(),anyInt())).thenReturn(session);
                createdConnections.add(connection);
                return connection;
            }
        });

        Connection conn = _connectionFactory.createConnection();
        Session createdSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        conn.close();
        verify(session, times(1)).close();

    }

    public void testConnectionWithExceptionNotPooled() throws Exception
    {
        _connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");

        final ArgumentCaptor<ConnectionURL> connectionCaptor = ArgumentCaptor.forClass(ConnectionURL.class);
        final List<CommonConnection> createdConnections = new ArrayList<>();
        when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new Answer<CommonConnection>()
        {
            @Override
            public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
            {
                ConnectionURL url = connectionCaptor.getValue();

                final CommonConnection connection = mock(CommonConnection.class);
                when(connection.isClosed()).thenReturn(false);
                when(connection.createSession(anyBoolean(),anyInt())).thenThrow(new JMSException("foo"));
                createdConnections.add(connection);
                return connection;
            }
        });

        Connection conn = _connectionFactory.createConnection();
        try
        {
            Session createdSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            fail("Expected an exception");
        }
        catch(JMSException e)
        {
            assertEquals("foo", e.getMessage());
        }
        conn.close();
        Connection conn2 = _connectionFactory.createConnection();
        assertEquals(2, createdConnections.size());
    }


    public void testConnectionWithExceptionListenerExceptionNotPooled() throws Exception
    {
        _connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");


        final List<CommonConnection> createdConnections = new ArrayList<>();

        when(_connectionCreator.newConnection(any(ConnectionURL.class))).thenAnswer(new Answer<CommonConnection>()
        {
            @Override
            public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
            {
                final CommonConnection connection = mock(CommonConnection.class);
                when(connection.isClosed()).thenReturn(false);
                createdConnections.add(connection);
                final ArgumentCaptor<ExceptionListener> listenerCaptor =
                        ArgumentCaptor.forClass(ExceptionListener.class);

                doAnswer(new Answer()
                {
                    @Override
                    public Object answer(final InvocationOnMock invocation) throws Throwable
                    {
                        when(connection.getExceptionListener()).thenReturn(listenerCaptor.getValue());
                        return null;
                    }
                }).when(connection).setExceptionListener(listenerCaptor.capture());

                return connection;
            }
        });

        Connection conn = _connectionFactory.createConnection();
        assertEquals(1, createdConnections.size());

        ExceptionListener listener = mock(ExceptionListener.class);
        conn.setExceptionListener(listener);
        assertEquals(listener, conn.getExceptionListener());
        verify(listener, never()).onException(any(JMSException.class));
        createdConnections.get(0).getExceptionListener().onException(new JMSException("bar"));
        verify(listener, times(1)).onException(any(JMSException.class));

        conn.close();
        Connection conn2 = _connectionFactory.createConnection();
        assertEquals(2, createdConnections.size());
    }

    public void testSessionCloseException() throws Exception
    {
        _connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");

        final ArgumentCaptor<ConnectionURL> connectionCaptor = ArgumentCaptor.forClass(ConnectionURL.class);
        final List<CommonConnection> createdConnections = new ArrayList<>();
        final Session session = mock(Session.class);
        doThrow(new JMSException("foo")).when(session).close();
        when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new Answer<CommonConnection>()
        {
            @Override
            public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
            {
                ConnectionURL url = connectionCaptor.getValue();

                final CommonConnection connection = mock(CommonConnection.class);
                when(connection.isClosed()).thenReturn(false);
                when(connection.createSession(anyBoolean(),anyInt())).thenReturn(session);
                createdConnections.add(connection);
                return connection;
            }
        });

        Connection conn = _connectionFactory.createConnection();
            Session createdSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

        try
        {
            conn.close();
            fail("Expected an exception");
        }
        catch(JMSException e)
        {
            assertEquals("foo", e.getMessage());
        }
        Connection conn2 = _connectionFactory.createConnection();
        assertEquals(2, createdConnections.size());
    }


}
