/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.artemis.tests.smoke.brokerConnection;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.tests.smoke.common.ContainerService;
import org.jboss.logging.Logger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/**
 * You need to build the Artemis Centos image before this test is executed.
 * Follow the instructions under artemis-docker and build the Docker-centos image.
 */
public class QpidDispatchPeerTest extends SmokeTestBase {

   private static final Logger logger = Logger.getLogger(QpidDispatchPeerTest.class);

   static Object network;
   static Object qpidServer;
   static Object artemisServer;

   static ContainerService service = ContainerService.getService();

   @Before
   public void disableThreadcheck() {
      disableCheckThread();
   }

   private static final String QDR_HOME = basedir + "/target/brokerConnect/qdr";

   @BeforeClass
   public static void startServers() {
      try {
         Assert.assertNotNull(basedir);

         network = service.newNetwork();

         artemisServer = service.newBrokerImage();
         service.setNetwork(artemisServer, network);
         service.exposePorts(artemisServer, 61616);
         service.prepareInstance(QDR_HOME);
         service.exposeBrokerHome(artemisServer, QDR_HOME);
         service.startLogging(artemisServer, "ArtemisServer:");

         qpidServer = service.newInterconnectImage();
         service.setNetwork(qpidServer, network);
         service.exposePorts(qpidServer, 5672);
         service.exposeHosts(qpidServer, "qdr");
         service.exposeFile(qpidServer, basedir + "/src/main/resources/servers/brokerConnect/qdr/qdrouterd.conf", "/tmp/qdrouterd.conf");
         service.exposeFolder(qpidServer, basedir + "/target/brokerConnect/qdr", "/routerlog");
         service.startLogging(qpidServer, "qpid-dispatch:");
         service.start(qpidServer);

         recreateBrokerDirectory(QDR_HOME);

         service.start(artemisServer);
      } catch (Exception e) {
         e.printStackTrace();
         Assume.assumeNoException("Docker not available", e);
      }
   }

   @AfterClass
   public static void stopServer() {
      service.stop(artemisServer);
      service.stop(qpidServer);
   }

   @Test
   public void testSendReceive() throws Exception {

      int numberOfMessages = 100;

      for (int dest = 0; dest < 5; dest++) {
         {
            ConnectionFactory factoryProducer = service.createCF(qpidServer, "amqp", 5672, "?amqpIdleTimeout=1000");
            Connection connection = null;

            connection = createConnectionDumbRetry(factoryProducer);

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("queue.test" + dest);
            MessageProducer producer = session.createProducer(queue);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);

            for (int i = 0; i < numberOfMessages; i++) {
               logger.debug("Sending " + i);
               producer.send(session.createTextMessage("hello " + i));
            }
            connection.close();
         }

         {
            ConnectionFactory factoryConsumer = service.createCF(artemisServer, "amqp", 61616, "?amqpIdleTimeout=1000");
            Connection connectionConsumer = factoryConsumer.createConnection("artemis", "artemis");
            Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queueConsumer = sessionConsumer.createQueue("queue.test" + dest);
            MessageConsumer consumer = sessionConsumer.createConsumer(queueConsumer);
            connectionConsumer.start();

            for (int i = 0; i < numberOfMessages; i++) {
               Message message = consumer.receive(5000);
               Assert.assertNotNull(message);
            }

            connectionConsumer.close();
         }

      }

   }

   @Test
   public void testSendReceiveDistinct() throws Exception {

      int numberOfMessages = 100;

      {
         ConnectionFactory factoryProducer = service.createCF(qpidServer, "amqp", 5672, "?amqpIdleTimeout=1000");
         Connection connection = null;

         connection = createConnectionDumbRetry(factoryProducer);

         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Queue queue = session.createQueue("queue.dist");
         MessageProducer producer = session.createProducer(queue);
         producer.setDeliveryMode(DeliveryMode.PERSISTENT);

         for (int i = 0; i < numberOfMessages; i++) {
            logger.debug("Sending " + i);
            producer.send(session.createTextMessage("hello " + i));
         }
         connection.close();
      }

      {
         ConnectionFactory factoryConsumer = service.createCF(artemisServer, "amqp", 61616, "?amqpIdleTimeout=1000");
         Connection connectionConsumer = factoryConsumer.createConnection("artemis", "artemis");
         Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Queue queueConsumer = sessionConsumer.createQueue("queue.dist::distinct.dist");
         MessageConsumer consumer = sessionConsumer.createConsumer(queueConsumer);
         connectionConsumer.start();

         for (int i = 0; i < numberOfMessages; i++) {
            Message message = consumer.receive(5000);
            Assert.assertNotNull(message);
         }

         connectionConsumer.close();
      }


   }

   private Connection createConnectionDumbRetry(ConnectionFactory factoryProducer) throws InterruptedException {
      for (int i = 0; i < 100; i++) {
         try {
            // Some retry
            return factoryProducer.createConnection();
         } catch (Exception e) {
            Thread.sleep(10);
         }
      }
      return null;
   }

}
