/* Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.qpid.test.unit.xa;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.XAQueueConnection;
import javax.jms.XAQueueConnectionFactory;
import javax.jms.XAQueueSession;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueTest extends AbstractXATestCase
{
    private static final Logger _logger = LoggerFactory.getLogger(QueueTest.class);

    /**
     * the queue use by all the tests
     */
    private static Queue _queue = null;
    /**
     * the queue connection factory used by all tests
     */
    private static XAQueueConnectionFactory _queueFactory = null;

    /**
     * standard xa queue connection
     */
    private static XAQueueConnection _xaqueueConnection = null;

    /**
     * standard xa queue connection
     */
    private static QueueConnection _queueConnection = null;


    /**
     * standard queue session created from the standard connection
     */
    private static QueueSession _nonXASession = null;


    /** ----------------------------------------------------------------------------------- **/
    /**
     * ----------------------------- JUnit support  ----------------------------------------- *
     */

    @Override
    public void tearDown() throws Exception
    {
        try
        {
            if (_xaqueueConnection != null)
            {
                _xaqueueConnection.close();
            }
            if (_queueConnection != null)
            {
                _queueConnection.close();
            }
        }
        finally
        {
            super.tearDown();
        }
    }

    /**
     * Initialize standard actors
     */
    public void init() throws Exception
    {
        if (!isBroker08())
        {
            _queueFactory = (XAQueueConnectionFactory) getConnectionFactory();
            _xaqueueConnection= getNewQueueXAConnection();
            final XAQueueSession session = _xaqueueConnection.createXAQueueSession();
            _queue = session.createQueue(getTestQueueName());
            _queueConnection = _queueFactory.createQueueConnection(GUEST_USERNAME, GUEST_PASSWORD);
            _nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
            init(session, _queue);
        }
    }

    /** -------------------------------------------------------------------------------------- **/
    /** ----------------------------- Test Suite  -------------------------------------------- **/
    /** -------------------------------------------------------------------------------------- **/

    /**
     * Uses two transactions respectively with xid1 and xid2 that are used to send a message
     * within xid1 and xid2.  xid2 is committed and xid1 is used to receive the message that was sent within xid2.
     * Xid is then committed and a standard transaction is used to receive the message that was sent within xid1.
     */
    public void testProducer()
    {
        if (!isBroker08())
        {
            _logger.debug("running testProducer");
            Xid xid1 = getNewXid();
            Xid xid2 = getNewXid();
            // start the xaResource for xid1
            try
            {
                _xaResource.start(xid1, XAResource.TMNOFLAGS);
            }
            catch (XAException e)
            {
                _logger.error("cannot start the transaction with xid1", e);
                fail("cannot start the transaction with xid1: " + e.getMessage());
            }
            try
            {
                // start the connection
                _xaqueueConnection.start();
                // produce a message with sequence number 1
                _message.setLongProperty(_sequenceNumberPropertyName, 1);
                _producer.send(_message);
            }
            catch (JMSException e)
            {
                fail(" cannot send persistent message: " + e.getMessage());
            }
            // suspend the transaction
            try
            {
                _xaResource.end(xid1, XAResource.TMSUSPEND);
            }
            catch (XAException e)
            {
                fail("Cannot end the transaction with xid1: " + e.getMessage());
            }
            // start the xaResource for xid2
            try
            {
                _xaResource.start(xid2, XAResource.TMNOFLAGS);
            }
            catch (XAException e)
            {
                fail("cannot start the transaction with xid2: " + e.getMessage());
            }
            try
            {
                // produce a message
                _message.setLongProperty(_sequenceNumberPropertyName, 2);
                _producer.send(_message);
            }
            catch (JMSException e)
            {
                fail(" cannot send second persistent message: " + e.getMessage());
            }
            // end xid2 and start xid1
            try
            {
                _xaResource.end(xid2, XAResource.TMSUCCESS);
                _xaResource.start(xid1, XAResource.TMRESUME);
            }
            catch (XAException e)
            {
                fail("Exception when ending and starting transactions: " + e.getMessage());
            }
            // two phases commit transaction with xid2
            try
            {
                int resPrepare = _xaResource.prepare(xid2);
                if (resPrepare != XAResource.XA_OK)
                {
                    fail("prepare returned: " + resPrepare);
                }
                _xaResource.commit(xid2, false);
            }
            catch (XAException e)
            {
                fail("Exception thrown when preparing transaction with xid2: " + e.getMessage());
            }
            // receive a message from queue test we expect it to be the second one
            try
            {
                TextMessage message = (TextMessage) _consumer.receive(1000);
                if (message == null)
                {
                    fail("did not receive second message as expected ");
                }
                else
                {
                    if (message.getLongProperty(_sequenceNumberPropertyName) != 2)
                    {
                        fail("receive wrong message its sequence number is: " + message
                                .getLongProperty(_sequenceNumberPropertyName));
                    }
                }
            }
            catch (JMSException e)
            {
                fail("Exception when receiving second message: " + e.getMessage());
            }
            // end and one phase commit the first transaction
            try
            {
                _xaResource.end(xid1, XAResource.TMSUCCESS);
                _xaResource.commit(xid1, true);
            }
            catch (XAException e)
            {
                fail("Exception thrown when commiting transaction with xid1");
            }
            // We should now be able to receive the first message
            try
            {
                _xaqueueConnection.close();
                Session nonXASession = _nonXASession;
                MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue);
                _queueConnection.start();
                TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
                if (message1 == null)
                {
                    fail("did not receive first message as expected ");
                }
                else
                {
                    if (message1.getLongProperty(_sequenceNumberPropertyName) != 1)
                    {
                        fail("receive wrong message its sequence number is: " + message1
                                .getLongProperty(_sequenceNumberPropertyName));
                    }
                }
                // commit that transacted session
                nonXASession.commit();
                // the queue should be now empty
                message1 = (TextMessage) nonXAConsumer.receive(1000);
                if (message1 != null)
                {
                    fail("receive an unexpected message ");
                }
            }
            catch (JMSException e)
            {
                fail("Exception thrown when emptying the queue: " + e.getMessage());
            }
        }
    }

    /**
     * strategy: Produce a message within Tx1 and prepare tx1. crash the server then commit tx1 and consume the message
     */
    public void testSendAndRecover()
    {
        if (!isBroker08())
        {
            _logger.debug("running testSendAndRecover");
            Xid xid1 = getNewXid();
            // start the xaResource for xid1
            try
            {
                _xaResource.start(xid1, XAResource.TMNOFLAGS);
            }
            catch (XAException e)
            {
                fail("cannot start the transaction with xid1: " + e.getMessage());
            }
            try
            {
                // start the connection
                _xaqueueConnection.start();
                // produce a message with sequence number 1
                _message.setLongProperty(_sequenceNumberPropertyName, 1);
                _producer.send(_message);
            }
            catch (JMSException e)
            {
                fail(" cannot send persistent message: " + e.getMessage());
            }
            // suspend the transaction
            try
            {
                _xaResource.end(xid1, XAResource.TMSUCCESS);
            }
            catch (XAException e)
            {
                fail("Cannot end the transaction with xid1: " + e.getMessage());
            }
            // prepare the transaction with xid1
            try
            {
                _xaResource.prepare(xid1);
            }
            catch (XAException e)
            {
                fail("Exception when preparing xid1: " + e.getMessage());
            }

            /////// stop the server now !!
            try
            {
                _logger.debug("stopping broker");
                restartDefaultBroker();
                init();
            }
            catch (Exception e)
            {
                fail("Exception when stopping and restarting the server");
            }

            // get the list of in doubt transactions
            try
            {
                Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN);
                if (inDoubt == null)
                {
                    fail("the array of in doubt transactions should not be null ");
                }
                // At that point we expect only two indoubt transactions:
                if (inDoubt.length != 1)
                {
                    fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions");
                }

                // commit them
                for (Xid anInDoubt : inDoubt)
                {
                    if (anInDoubt.equals(xid1))
                    {
                        _logger.info("commit xid1 ");
                        try
                        {
                            _xaResource.commit(anInDoubt, false);
                        }
                        catch (Exception e)
                        {
                            _logger.error("PB when aborted xid1", e);
                        }
                    }
                    else
                    {
                        fail("did not receive right xid ");
                    }
                }
            }
            catch (XAException e)
            {
                _logger.error("exception thrown when recovering transactions", e);
                fail("exception thrown when recovering transactions " + e.getMessage());
            }
            // the queue should contain the first message!
            try
            {
                _xaqueueConnection.close();
                Session nonXASession = _nonXASession;
                MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue);
                _queueConnection.start();
                TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);

                if (message1 == null)
                {
                    fail("queue does not contain any message!");
                }
                if (message1.getLongProperty(_sequenceNumberPropertyName) != 1)
                {
                    fail("Wrong message returned! Sequence number is " + message1
                            .getLongProperty(_sequenceNumberPropertyName));
                }
                nonXASession.commit();
            }
            catch (JMSException e)
            {
                fail("Exception thrown when testin that queue test is not empty: " + e.getMessage());
            }
        }
    }

    /**
     * strategy: Produce a message within Tx1 and prepare tx1. Produce a standard message and consume
     * it within tx2 and prepare tx2. Shutdown the server and get the list of in doubt transactions:
     * we expect tx1 and tx2! Then, Tx1 is aborted and tx2 is committed so we expect the test's queue to be empty!
     */
    public void testRecover()
    {
        if (!isBroker08())
        {
            _logger.debug("running testRecover");
            Xid xid1 = getNewXid();
            Xid xid2 = getNewXid();
            // start the xaResource for xid1
            try
            {
                _xaResource.start(xid1, XAResource.TMNOFLAGS);
            }
            catch (XAException e)
            {
                fail("cannot start the transaction with xid1: " + e.getMessage());
            }
            try
            {
                // start the connection
                _xaqueueConnection.start();
                // produce a message with sequence number 1
                _message.setLongProperty(_sequenceNumberPropertyName, 1);
                _producer.send(_message);
            }
            catch (JMSException e)
            {
                fail(" cannot send persistent message: " + e.getMessage());
            }
            // suspend the transaction
            try
            {
                _xaResource.end(xid1, XAResource.TMSUCCESS);
            }
            catch (XAException e)
            {
                fail("Cannot end the transaction with xid1: " + e.getMessage());
            }
            // prepare the transaction with xid1
            try
            {
                _xaResource.prepare(xid1);
            }
            catch (XAException e)
            {
                fail("Exception when preparing xid1: " + e.getMessage());
            }

            // send a message using the standard session
            try
            {
                Session nonXASession = _nonXASession;
                MessageProducer nonXAProducer = nonXASession.createProducer(_queue);
                TextMessage message2 = nonXASession.createTextMessage();
                message2.setText("non XA ");
                message2.setLongProperty(_sequenceNumberPropertyName, 2);
                nonXAProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
                nonXAProducer.send(message2);
                // commit that transacted session
                nonXASession.commit();
            }
            catch (Exception e)
            {
                fail("Exception thrown when emptying the queue: " + e.getMessage());
            }
            // start the xaResource for xid2
            try
            {
                _xaResource.start(xid2, XAResource.TMNOFLAGS);
            }
            catch (XAException e)
            {
                fail("cannot start the transaction with xid1: " + e.getMessage());
            }
            // receive a message from queue test we expect it to be the second one
            try
            {
                TextMessage message = (TextMessage) _consumer.receive(1000);
                if (message == null || message.getLongProperty(_sequenceNumberPropertyName) != 2)
                {
                    fail("did not receive second message as expected ");
                }
            }
            catch (JMSException e)
            {
                fail("Exception when receiving second message: " + e.getMessage());
            }
            // suspend the transaction
            try
            {
                _xaResource.end(xid2, XAResource.TMSUCCESS);
            }
            catch (XAException e)
            {
                fail("Cannot end the transaction with xid2: " + e.getMessage());
            }
            // prepare the transaction with xid1
            try
            {
                _xaResource.prepare(xid2);
            }
            catch (XAException e)
            {
                fail("Exception when preparing xid2: " + e.getMessage());
            }

            /////// stop the server now !!
            try
            {
                _logger.debug("stopping broker");
                restartDefaultBroker();
                init();
            }
            catch (Exception e)
            {
                fail("Exception when stopping and restarting the server");
            }

            // get the list of in doubt transactions
            try
            {
                Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN);
                if (inDoubt == null)
                {
                    fail("the array of in doubt transactions should not be null ");
                }
                // At that point we expect only two indoubt transactions:
                if (inDoubt.length != 2)
                {
                    fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions");
                }

                // commit them
                for (Xid anInDoubt : inDoubt)
                {
                    if (anInDoubt.equals(xid1))
                    {
                         _logger.debug("rollback xid1 ");
                        try
                        {
                            _xaResource.rollback(anInDoubt);
                        }
                        catch (Exception e)
                        {
                            _logger.error("PB when aborted xid1", e);
                        }
                    }
                    else if (anInDoubt.equals(xid2))
                    {
                        _logger.debug("commit xid2 ");
                        try
                        {
                            _xaResource.commit(anInDoubt, false);
                        }
                        catch (Exception e)
                        {
                            _logger.error("PB when commiting xid2", e);
                        }
                    }
                }
            }
            catch (XAException e)
            {
                _logger.error("exception thrown when recovering transactions", e);
                fail("exception thrown when recovering transactions " + e.getMessage());
            }
            // the queue should be empty
            try
            {
                _xaqueueConnection.close();
                Session nonXASession = _nonXASession;
                MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue);
                _queueConnection.start();
                TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
                if (message1 != null)
                {

                    fail("The queue is not empty! " + message1.getLongProperty(_sequenceNumberPropertyName));
                }
            }
            catch (JMSException e)
            {
                fail("Exception thrown when testin that queue test is empty: " + e.getMessage());
            }
        }
    }

    /** -------------------------------------------------------------------------------------- **/
    /** ----------------------------- Utility methods  --------------------------------------- **/
    /** -------------------------------------------------------------------------------------- **/

    /**
     * get a new queue connection
     *
     * @return a new queue connection
     * @throws JMSException If the JMS provider fails to create the queue connection
     *                      due to some internal error or in case of authentication failure
     */
    private XAQueueConnection getNewQueueXAConnection() throws JMSException
    {
        return _queueFactory.createXAQueueConnection("guest", "guest");
    }


}
