/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.artemis.ra;

import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.ResourceAllocationException;
import javax.jms.Session;
import javax.jms.XASession;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
import javax.resource.spi.ConnectionEventListener;
import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.IllegalStateException;
import javax.resource.spi.LocalTransaction;
import javax.resource.spi.ManagedConnection;
import javax.resource.spi.ManagedConnectionMetaData;
import javax.resource.spi.SecurityException;
import javax.security.auth.Subject;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQXAConnection;
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
import org.apache.activemq.artemis.service.extensions.xa.ActiveMQXAResourceWrapper;
import org.apache.activemq.artemis.utils.VersionLoader;

/**
 * The managed connection
 */
public final class ActiveMQRAManagedConnection implements ManagedConnection, ExceptionListener {

   /**
    * Trace enabled
    */
   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();

   /**
    * The managed connection factory
    */
   private final ActiveMQRAManagedConnectionFactory mcf;

   /**
    * The connection request information
    */
   private final ActiveMQRAConnectionRequestInfo cri;

   /**
    * The resource adapter
    */
   private final ActiveMQResourceAdapter ra;

   /**
    * The user name
    */
   private final String userName;

   /**
    * The password
    */
   private final String password;

   /**
    * Has the connection been destroyed
    */
   private final AtomicBoolean isDestroyed = new AtomicBoolean(false);

   /**
    * Event listeners
    */
   private final List<ConnectionEventListener> eventListeners;

   /**
    * Handles
    */
   private final Set<ActiveMQRASession> handles;

   /**
    * Lock
    */
   private ReentrantLock lock = new ReentrantLock();

   // Physical connection stuff
   private ActiveMQConnectionFactory connectionFactory;

   private ActiveMQXAConnection connection;

   // The ManagedConnection will play with a XA and a NonXASession to couple with
   // cases where a commit is called on a non-XAed (or non-enlisted) case.
   private Session nonXAsession;

   private XASession xaSession;

   private XAResource xaResource;

   private final TransactionManager tm;

   private boolean inManagedTx;

   /**
    * Constructor
    *
    * @param mcf      The managed connection factory
    * @param cri      The connection request information
    * @param userName The user name
    * @param password The password
    */
   public ActiveMQRAManagedConnection(final ActiveMQRAManagedConnectionFactory mcf,
                                      final ActiveMQRAConnectionRequestInfo cri,
                                      final ActiveMQResourceAdapter ra,
                                      final String userName,
                                      final String password) throws ResourceException {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("constructor(" + mcf + ", " + cri + ", " + userName + ", ****)");
      }

      this.mcf = mcf;
      this.cri = cri;
      this.tm = ra.getTM();
      this.ra = ra;
      this.userName = userName;
      this.password = password;
      eventListeners = Collections.synchronizedList(new ArrayList<ConnectionEventListener>());
      handles = Collections.synchronizedSet(new HashSet<ActiveMQRASession>());

      connection = null;
      nonXAsession = null;
      xaSession = null;
      xaResource = null;

      try {
         setup();
      }
      catch (ResourceException e) {
         try {
            destroy();
         }
         catch (Throwable ignored) {
         }

         throw e;
      }
      catch (Throwable t) {
         try {
            destroy();
         }
         catch (Throwable ignored) {
         }
         throw new ResourceException("Error during setup", t);
      }
   }

   /**
    * Get a connection
    *
    * @param subject       The security subject
    * @param cxRequestInfo The request info
    * @return The connection
    * @throws ResourceException Thrown if an error occurs
    */
   @Override
   public synchronized Object getConnection(final Subject subject,
                                            final ConnectionRequestInfo cxRequestInfo) throws ResourceException {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("getConnection(" + subject + ", " + cxRequestInfo + ")");
      }

      // Check user first
      ActiveMQRACredential credential = ActiveMQRACredential.getCredential(mcf, subject, cxRequestInfo);

      // Null users are allowed!
      if (userName != null && !userName.equals(credential.getUserName())) {
         throw new SecurityException("Password credentials not the same, reauthentication not allowed");
      }

      if (userName == null && credential.getUserName() != null) {
         throw new SecurityException("Password credentials not the same, reauthentication not allowed");
      }

      if (isDestroyed.get()) {
         throw new IllegalStateException("The managed connection is already destroyed");
      }

      ActiveMQRASession session = new ActiveMQRASession(this, (ActiveMQRAConnectionRequestInfo) cxRequestInfo);
      handles.add(session);
      return session;
   }

   /**
    * Destroy all handles.
    *
    * @throws ResourceException Failed to close one or more handles.
    */
   private void destroyHandles() throws ResourceException {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("destroyHandles()");
      }

      try {

         if (connection != null) {
            connection.stop();
         }
      }
      catch (Throwable t) {
         ActiveMQRALogger.LOGGER.trace("Ignored error stopping connection", t);
      }

      for (ActiveMQRASession session : handles) {
         session.destroy();
      }

      handles.clear();
   }

   /**
    * Destroy the physical connection.
    *
    * @throws ResourceException Could not property close the session and connection.
    */
   @Override
   public void destroy() throws ResourceException {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("destroy()");
      }

      if (isDestroyed.get() || connection == null) {
         return;
      }

      isDestroyed.set(true);

      try {
         connection.setExceptionListener(null);
      }
      catch (JMSException e) {
         ActiveMQRALogger.LOGGER.debug("Error unsetting the exception listener " + this, e);
      }
      if (connection != null) {
         connection.signalStopToAllSessions();
      }

      destroyHandles();

      try {
         /**
          * (xa|nonXA)Session.close() may NOT be called BEFORE connection.close()
          * <p>
          * If the ClientSessionFactory is trying to fail-over or reconnect with -1 attempts, and
          * one calls session.close() it may effectively dead-lock.
          * <p>
          * connection close will close the ClientSessionFactory which will close all sessions.
          */
         if (connection != null) {
            connection.close();
         }

         // The following calls should not be necessary, as the connection should close the
         // ClientSessionFactory, which will close the sessions.
         try {
            if (nonXAsession != null) {
               nonXAsession.close();
            }

            if (xaSession != null) {
               xaSession.close();
            }
         }
         catch (JMSException e) {
            ActiveMQRALogger.LOGGER.debug("Error closing session " + this, e);
         }

         // we must close the ActiveMQConnectionFactory because it contains a ServerLocator
         if (connectionFactory != null) {
            ra.closeConnectionFactory(mcf.getProperties());
         }
      }
      catch (Throwable e) {
         throw new ResourceException("Could not properly close the session and connection", e);
      }
   }

   /**
    * Cleanup
    *
    * @throws ResourceException Thrown if an error occurs
    */
   @Override
   public void cleanup() throws ResourceException {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("cleanup()");
      }

      if (isDestroyed.get()) {
         throw new IllegalStateException("ManagedConnection already destroyed");
      }

      destroyHandles();

      inManagedTx = false;

      inManagedTx = false;

      // I'm recreating the lock object when we return to the pool
      // because it looks too nasty to expect the connection handle
      // to unlock properly in certain race conditions
      // where the dissociation of the managed connection is "random".
      lock = new ReentrantLock();
   }

   /**
    * Move a handler from one mc to this one.
    *
    * @param obj An object of type ActiveMQSession.
    * @throws ResourceException     Failed to associate connection.
    * @throws IllegalStateException ManagedConnection in an illegal state.
    */
   @Override
   public void associateConnection(final Object obj) throws ResourceException {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("associateConnection(" + obj + ")");
      }

      if (!isDestroyed.get() && obj instanceof ActiveMQRASession) {
         ActiveMQRASession h = (ActiveMQRASession) obj;
         h.setManagedConnection(this);
         handles.add(h);
      }
      else {
         throw new IllegalStateException("ManagedConnection in an illegal state");
      }
   }

   public void checkTransactionActive() throws JMSException {
      // don't bother looking at the transaction if there's an active XID
      if (!inManagedTx && tm != null) {
         try {
            Transaction tx = tm.getTransaction();
            if (tx != null) {
               int status = tx.getStatus();
               // Only allow states that will actually succeed
               if (status != Status.STATUS_ACTIVE && status != Status.STATUS_PREPARING &&
                  status != Status.STATUS_PREPARED &&
                  status != Status.STATUS_COMMITTING) {
                  throw new javax.jms.IllegalStateException("Transaction " + tx + " not active");
               }
            }
         }
         catch (SystemException e) {
            JMSException jmsE = new javax.jms.IllegalStateException("Unexpected exception on the Transaction ManagerTransaction");
            jmsE.initCause(e);
            throw jmsE;
         }
      }
   }

   /**
    * Aqquire a lock on the managed connection
    */
   protected void lock() {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("lock()");
      }

      lock.lock();
   }

   /**
    * Aqquire a lock on the managed connection within the specified period
    *
    * @throws JMSException Thrown if an error occurs
    */
   protected void tryLock() throws JMSException {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("tryLock()");
      }

      Integer tryLock = mcf.getUseTryLock();
      if (tryLock == null || tryLock.intValue() <= 0) {
         lock();
         return;
      }
      try {
         if (lock.tryLock(tryLock.intValue(), TimeUnit.SECONDS) == false) {
            throw new ResourceAllocationException("Unable to obtain lock in " + tryLock + " seconds: " + this);
         }
      }
      catch (InterruptedException e) {
         throw new ResourceAllocationException("Interrupted attempting lock: " + this);
      }
   }

   /**
    * Unlock the managed connection
    */
   protected void unlock() {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("unlock()");
      }

      lock.unlock();
   }

   /**
    * Add a connection event listener.
    *
    * @param l The connection event listener to be added.
    */
   @Override
   public void addConnectionEventListener(final ConnectionEventListener l) {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("addConnectionEventListener(" + l + ")");
      }

      eventListeners.add(l);
   }

   /**
    * Remove a connection event listener.
    *
    * @param l The connection event listener to be removed.
    */
   @Override
   public void removeConnectionEventListener(final ConnectionEventListener l) {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("removeConnectionEventListener(" + l + ")");
      }

      eventListeners.remove(l);
   }

   /**
    * Get the XAResource for the connection.
    *
    * @return The XAResource for the connection.
    * @throws ResourceException XA transaction not supported
    */
   @Override
   public XAResource getXAResource() throws ResourceException {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("getXAResource()");
      }

      //
      // Spec says a mc must always return the same XA resource,
      // so we cache it.
      //
      if (xaResource == null) {
         ClientSessionInternal csi = (ClientSessionInternal) xaSession.getXAResource();
         ActiveMQRAXAResource activeMQRAXAResource = new ActiveMQRAXAResource(this, xaSession.getXAResource());
         Map<String, Object> xaResourceProperties = new HashMap<>();
         xaResourceProperties.put(ActiveMQXAResourceWrapper.ACTIVEMQ_JNDI_NAME, ra.getJndiName());
         xaResourceProperties.put(ActiveMQXAResourceWrapper.ACTIVEMQ_NODE_ID, csi.getNodeId());
         xaResourceProperties.put(ActiveMQXAResourceWrapper.ACTIVEMQ_PRODUCT_NAME, ActiveMQResourceAdapter.PRODUCT_NAME);
         xaResourceProperties.put(ActiveMQXAResourceWrapper.ACTIVEMQ_PRODUCT_VERSION, VersionLoader.getVersion().getFullVersion());
         xaResource = ServiceUtils.wrapXAResource(activeMQRAXAResource, xaResourceProperties);
      }

      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("XAResource=" + xaResource);
      }

      return xaResource;
   }

   /**
    * Get the location transaction for the connection.
    *
    * @return The local transaction for the connection.
    * @throws ResourceException Thrown if operation fails.
    */
   @Override
   public LocalTransaction getLocalTransaction() throws ResourceException {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("getLocalTransaction()");
      }

      LocalTransaction tx = new ActiveMQRALocalTransaction(this);

      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("LocalTransaction=" + tx);
      }

      return tx;
   }

   /**
    * Get the meta data for the connection.
    *
    * @return The meta data for the connection.
    * @throws ResourceException     Thrown if the operation fails.
    * @throws IllegalStateException Thrown if the managed connection already is destroyed.
    */
   @Override
   public ManagedConnectionMetaData getMetaData() throws ResourceException {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("getMetaData()");
      }

      if (isDestroyed.get()) {
         throw new IllegalStateException("The managed connection is already destroyed");
      }

      return new ActiveMQRAMetaData(this);
   }

   /**
    * Set the log writer -- NOT SUPPORTED
    *
    * @param out The log writer
    * @throws ResourceException If operation fails
    */
   @Override
   public void setLogWriter(final PrintWriter out) throws ResourceException {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("setLogWriter(" + out + ")");
      }
   }

   /**
    * Get the log writer -- NOT SUPPORTED
    *
    * @return Always null
    * @throws ResourceException If operation fails
    */
   @Override
   public PrintWriter getLogWriter() throws ResourceException {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("getLogWriter()");
      }

      return null;
   }

   /**
    * Notifies user of a JMS exception.
    *
    * @param exception The JMS exception
    */
   @Override
   public void onException(final JMSException exception) {
      if (ActiveMQConnection.EXCEPTION_FAILOVER.equals(exception.getErrorCode())) {
         return;
      }
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("onException(" + exception + ")");
      }

      if (isDestroyed.get()) {
         if (ActiveMQRAManagedConnection.trace) {
            ActiveMQRALogger.LOGGER.trace("Ignoring error on already destroyed connection " + this, exception);
         }
         return;
      }

      ActiveMQRALogger.LOGGER.handlingJMSFailure(exception);

      try {
         connection.setExceptionListener(null);
      }
      catch (JMSException e) {
         ActiveMQRALogger.LOGGER.debug("Unable to unset exception listener", e);
      }

      ConnectionEvent event = new ConnectionEvent(this, ConnectionEvent.CONNECTION_ERROR_OCCURRED, exception);
      sendEvent(event);
   }

   /**
    * Get the session for this connection.
    *
    * @return The session
    * @throws JMSException
    */
   protected Session getSession() throws JMSException {
      if (xaResource != null && inManagedTx) {
         if (ActiveMQRAManagedConnection.trace) {
            ActiveMQRALogger.LOGGER.trace("getSession() -> XA session " + xaSession.getSession());
         }

         return xaSession.getSession();
      }
      else {
         if (ActiveMQRAManagedConnection.trace) {
            ActiveMQRALogger.LOGGER.trace("getSession() -> non XA session " + nonXAsession);
         }

         return nonXAsession;
      }
   }

   /**
    * Send an event.
    *
    * @param event The event to send.
    */
   protected void sendEvent(final ConnectionEvent event) {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("sendEvent(" + event + ")");
      }

      int type = event.getId();

      // convert to an array to avoid concurrent modification exceptions
      ConnectionEventListener[] list = eventListeners.toArray(new ConnectionEventListener[eventListeners.size()]);

      for (ConnectionEventListener l : list) {
         switch (type) {
            case ConnectionEvent.CONNECTION_CLOSED:
               l.connectionClosed(event);
               break;

            case ConnectionEvent.LOCAL_TRANSACTION_STARTED:
               l.localTransactionStarted(event);
               break;

            case ConnectionEvent.LOCAL_TRANSACTION_COMMITTED:
               l.localTransactionCommitted(event);
               break;

            case ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK:
               l.localTransactionRolledback(event);
               break;

            case ConnectionEvent.CONNECTION_ERROR_OCCURRED:
               l.connectionErrorOccurred(event);
               break;

            default:
               throw new IllegalArgumentException("Illegal eventType: " + type);
         }
      }
   }

   /**
    * Remove a handle from the handle map.
    *
    * @param handle The handle to remove.
    */
   protected void removeHandle(final ActiveMQRASession handle) {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("removeHandle(" + handle + ")");
      }

      handles.remove(handle);
   }

   /**
    * Get the request info for this connection.
    *
    * @return The connection request info for this connection.
    */
   protected ActiveMQRAConnectionRequestInfo getCRI() {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("getCRI()");
      }

      return cri;
   }

   /**
    * Get the connection factory for this connection.
    *
    * @return The connection factory for this connection.
    */
   protected ActiveMQRAManagedConnectionFactory getManagedConnectionFactory() {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("getManagedConnectionFactory()");
      }

      return mcf;
   }

   /**
    * Start the connection
    *
    * @throws JMSException Thrown if the connection can't be started
    */
   void start() throws JMSException {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("start()");
      }

      if (connection != null) {
         connection.start();
      }
   }

   /**
    * Stop the connection
    *
    * @throws JMSException Thrown if the connection can't be stopped
    */
   void stop() throws JMSException {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("stop()");
      }

      if (connection != null) {
         connection.stop();
      }
   }

   /**
    * Get the user name
    *
    * @return The user name
    */
   protected String getUserName() {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("getUserName()");
      }

      return userName;
   }

   /**
    * Setup the connection.
    *
    * @throws ResourceException Thrown if a connection couldn't be created
    */
   private void setup() throws ResourceException {
      if (ActiveMQRAManagedConnection.trace) {
         ActiveMQRALogger.LOGGER.trace("setup()");
      }

      try {

         createCF();

         boolean transacted = cri.isTransacted();
         int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
         if (cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION) {
            if (userName != null && password != null) {
               connection = (ActiveMQXAConnection) connectionFactory.createXATopicConnection(userName, password);
            }
            else {
               connection = (ActiveMQXAConnection) connectionFactory.createXATopicConnection();
            }

            connection.setExceptionListener(this);

            xaSession = connection.createXATopicSession();
            nonXAsession = connection.createNonXATopicSession(transacted, acknowledgeMode);

         }
         else if (cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION) {
            if (userName != null && password != null) {
               connection = (ActiveMQXAConnection) connectionFactory.createXAQueueConnection(userName, password);
            }
            else {
               connection = (ActiveMQXAConnection) connectionFactory.createXAQueueConnection();
            }

            connection.setExceptionListener(this);

            xaSession = connection.createXAQueueSession();
            nonXAsession = connection.createNonXAQueueSession(transacted, acknowledgeMode);

         }
         else {
            if (userName != null && password != null) {
               connection = (ActiveMQXAConnection) connectionFactory.createXAConnection(userName, password);
            }
            else {
               connection = (ActiveMQXAConnection) connectionFactory.createXAConnection();
            }

            connection.setExceptionListener(this);

            xaSession = connection.createXASession();
            nonXAsession = connection.createNonXASession(transacted, acknowledgeMode);
         }

      }
      catch (JMSException je) {
         throw new ResourceException(je.getMessage(), je);
      }
   }

   private void createCF() {
      if (connectionFactory == null) {
         connectionFactory = ra.getConnectionFactory(mcf.getProperties());
      }
   }

   protected void setInManagedTx(boolean inManagedTx) {
      this.inManagedTx = inManagedTx;
   }

   public ActiveMQConnectionFactory getConnectionFactory() {
      return connectionFactory;
   }
}
