/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.artemis.core.transaction.impl;

import javax.transaction.xa.Xid;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQTransactionTimeoutException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.jboss.logging.Logger;

public class TransactionImpl implements Transaction {

   private static final Logger logger = Logger.getLogger(TransactionImpl.class);

   private List<TransactionOperation> operations;

   private List<TransactionOperation> storeOperations;

   private static final int INITIAL_NUM_PROPERTIES = 10;

   private Object[] properties = null;

   protected final StorageManager storageManager;

   private final Xid xid;

   private final long id;

   private volatile State state = State.ACTIVE;

   private ActiveMQException exception;

   private final Object timeoutLock = new Object();

   private final long createTime;

   private volatile boolean containsPersistent;

   private int timeoutSeconds = -1;

   private Object protocolData;

   private void ensurePropertiesCapacity(int capacity) {
      if (properties != null && properties.length >= capacity) {
         return;
      }
      createOrEnlargeProperties(capacity);
   }

   private void createOrEnlargeProperties(int capacity) {
      if (properties == null) {
         properties = new Object[Math.min(TransactionImpl.INITIAL_NUM_PROPERTIES, capacity)];
      } else {
         assert properties.length < capacity;
         properties = Arrays.copyOf(properties, capacity);
      }
   }

   @Override
   public Object getProtocolData() {
      return protocolData;
   }

   @Override
   public void setProtocolData(Object protocolData) {
      this.protocolData = protocolData;
   }

   public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds) {
      this(storageManager.generateID(), null, storageManager, timeoutSeconds);
   }

   public TransactionImpl(final StorageManager storageManager) {
      this(storageManager.generateID(), null, storageManager,-1);
   }


   public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
      this(storageManager.generateID(), xid, storageManager, timeoutSeconds);
   }

   public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) {
      this(id, xid, storageManager, -1);
   }

   private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
      this.storageManager = storageManager;

      this.xid = xid;

      this.id = id;

      this.createTime = System.currentTimeMillis();

      this.timeoutSeconds = timeoutSeconds;
   }

   // Transaction implementation
   // -----------------------------------------------------------

   @Override
   public boolean isEffective() {
      return state == State.PREPARED || state == State.COMMITTED || state == State.ROLLEDBACK;
   }

   @Override
   public void setContainsPersistent() {
      containsPersistent = true;
   }

   @Override
   public boolean isContainsPersistent() {
      return containsPersistent;
   }

   @Override
   public void setTimeout(final int timeout) {
      this.timeoutSeconds = timeout;
   }

   @Override
   public RefsOperation createRefsOperation(Queue queue, AckReason reason) {
      return new RefsOperation(queue, reason, storageManager);
   }

   @Override
   public long getID() {
      return id;
   }

   @Override
   public long getCreateTime() {
      return createTime;
   }

   @Override
   public boolean hasTimedOut(final long currentTime, final int defaultTimeout) {
      synchronized (timeoutLock) {
         boolean timedout;
         if (timeoutSeconds == -1) {
            timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + (long) defaultTimeout * 1000;
         } else {
            timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + (long) timeoutSeconds * 1000;
         }

         if (timedout) {
            markAsRollbackOnly(new ActiveMQTransactionTimeoutException());
         }

         return timedout;
      }
   }

   @Override
   public boolean hasTimedOut() {
      return state == State.ROLLBACK_ONLY && exception != null && exception.getType() == ActiveMQExceptionType.TRANSACTION_TIMEOUT;
   }

   @Override
   public void prepare() throws Exception {
      if (logger.isTraceEnabled()) {
         logger.trace("TransactionImpl::prepare::" + this);
      }
      try (ArtemisCloseable lock = storageManager.closeableReadLock()) {
         synchronized (timeoutLock) {
            if (isEffective()) {
               logger.debug("TransactionImpl::prepare::" + this + " is being ignored");
               return;
            }
            if (state == State.ROLLBACK_ONLY) {
               if (logger.isTraceEnabled()) {
                  logger.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this);
               }

               internalRollback();

               if (exception != null) {
                  throw exception;
               } else {
                  // Do nothing
                  return;
               }
            } else if (state != State.ACTIVE) {
               throw new IllegalStateException("Transaction is in invalid state " + state);
            }

            if (xid == null) {
               throw new IllegalStateException("Cannot prepare non XA transaction");
            }

            beforePrepare();

            storageManager.prepare(id, xid);

            state = State.PREPARED;
            // We use the Callback even for non persistence
            // If we are using non-persistence with replication, the replication manager will have
            // to execute this runnable in the correct order
            storageManager.afterCompleteOperations(new IOCallback() {

               @Override
               public void onError(final int errorCode, final String errorMessage) {
                  ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
               }

               @Override
               public void done() {
                  afterPrepare();
               }
            });
         }
      }
   }

   @Override
   public void commit() throws Exception {
      commit(true);
   }

   @Override
   public void commit(final boolean onePhase) throws Exception {
      if (logger.isTraceEnabled()) {
         logger.trace("TransactionImpl::commit::" + this);
      }
      synchronized (timeoutLock) {
         if (state == State.COMMITTED) {
            // I don't think this could happen, but just in case
            logger.debug("TransactionImpl::commit::" + this + " is being ignored");
            return;
         }
         if (state == State.ROLLBACK_ONLY) {
            internalRollback();

            if (exception != null) {
               throw exception;
            } else {
               // Do nothing
               return;
            }
         }

         if (xid != null) {
            if (onePhase && state != State.ACTIVE || !onePhase && state != State.PREPARED) {
               throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state);
            }
         } else {
            if (state != State.ACTIVE) {
               throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state);
            }
         }

         beforeCommit();

         doCommit();

         // We want to make sure that nothing else gets done after the commit is issued
         // this will eliminate any possibility or races
         final List<TransactionOperation> operationsToComplete = this.operations;
         this.operations = null;

         // We use the Callback even for non persistence
         // If we are using non-persistence with replication, the replication manager will have
         // to execute this runnable in the correct order
         // This also will only use a different thread if there are any IO pending.
         // If the IO finished early by the time we got here, we won't need an executor
         storageManager.afterCompleteOperations(new IOCallback() {

            @Override
            public void onError(final int errorCode, final String errorMessage) {
               ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
            }

            @Override
            public void done() {
               afterCommit(operationsToComplete);
            }
         });

         final List<TransactionOperation> storeOperationsToComplete = this.storeOperations;
         this.storeOperations = null;

         if (storeOperationsToComplete != null) {
            storageManager.afterStoreOperations(new IOCallback() {

               @Override
               public void onError(final int errorCode, final String errorMessage) {
                  ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
               }

               @Override
               public void done() {
                  afterCommit(storeOperationsToComplete);
               }
            });
         }

      }
   }

   /**
    * @throws Exception
    */
   protected void doCommit() throws Exception {
      if (containsPersistent || xid != null && state == State.PREPARED) {
         // ^^ These are the scenarios where we require a storage.commit
         // for anything else we won't use the journal
         storageManager.commit(id);
      }

      state = State.COMMITTED;
   }

   @Override
   public boolean tryRollback() {
      synchronized (timeoutLock) {
         if (state == State.ROLLEDBACK) {
            // I don't think this could happen, but just in case
            logger.debug("TransactionImpl::rollbackIfPossible::" + this + " is being ignored");
            return true;
         }
         if (state != State.PREPARED) {
            try {
               internalRollback();
               return true;
            } catch (Exception e) {
               // nothing we can do beyond logging
               // no need to special handler here as this was not even supposed to happen at this point
               // even if it happenes this would be the exception of the exception, so we just log here
               logger.warn(e.getMessage(), e);
            }
         }
      }
      return false;
   }

   @Override
   public void rollback() throws Exception {
      if (logger.isTraceEnabled()) {
         logger.trace("TransactionImpl::rollback::" + this);
      }

      synchronized (timeoutLock) {
         if (state == State.ROLLEDBACK) {
            // I don't think this could happen, but just in case
            logger.debug("TransactionImpl::rollback::" + this + " is being ignored");
            return;
         }
         if (xid != null) {
            if (state != State.PREPARED && state != State.ACTIVE && state != State.ROLLBACK_ONLY) {
               throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state);
            }
         } else {
            if (state != State.ACTIVE && state != State.ROLLBACK_ONLY) {
               throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state);
            }
         }

         internalRollback();
      }
   }

   private void internalRollback() throws Exception {
      if (logger.isTraceEnabled()) {
         logger.trace("TransactionImpl::internalRollback " + this);
      }

      beforeRollback();

      try {
         doRollback();
         state = State.ROLLEDBACK;
      } catch (IllegalStateException e) {
         // Something happened before and the TX didn't make to the Journal / Storage
         // We will like to execute afterRollback and clear anything pending
         ActiveMQServerLogger.LOGGER.failedToPerformRollback(e);
      }
      // We want to make sure that nothing else gets done after the commit is issued
      // this will eliminate any possibility or races
      final List<TransactionOperation> operationsToComplete = this.operations;
      this.operations = null;

      final List<TransactionOperation> storeOperationsToComplete = this.storeOperations;
      this.storeOperations = null;

      // We use the Callback even for non persistence
      // If we are using non-persistence with replication, the replication manager will have
      // to execute this runnable in the correct order
      storageManager.afterCompleteOperations(new IOCallback() {

         @Override
         public void onError(final int errorCode, final String errorMessage) {
            ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
         }

         @Override
         public void done() {
            afterRollback(operationsToComplete);
         }
      });

      if (storeOperationsToComplete != null) {
         storageManager.afterStoreOperations(new IOCallback() {

            @Override
            public void onError(final int errorCode, final String errorMessage) {
               ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
            }

            @Override
            public void done() {
               afterRollback(storeOperationsToComplete);
            }
         });
      }
   }

   @Override
   public void suspend() {
      synchronized (timeoutLock) {
         if (state != State.ACTIVE) {
            throw new IllegalStateException("Can only suspend active transaction");
         }
         state = State.SUSPENDED;
      }
   }

   @Override
   public void resume() {
      synchronized (timeoutLock) {
         if (state != State.SUSPENDED) {
            throw new IllegalStateException("Can only resume a suspended transaction");
         }
         state = State.ACTIVE;
      }
   }

   @Override
   public Transaction.State getState() {
      return state;
   }

   @Override
   public void setState(final State state) {
      this.state = state;
   }

   @Override
   public Xid getXid() {
      return xid;
   }

   @Override
   public void markAsRollbackOnly(final ActiveMQException exception) {
      synchronized (timeoutLock) {
         if (logger.isTraceEnabled()) {
            logger.trace("TransactionImpl::" + this + " marking rollbackOnly for " + exception.toString() + ", msg=" + exception.getMessage());
         }

         if (isEffective()) {
            logger.debug("Trying to mark transaction " + this.id + " xid=" + this.xid + " as rollbackOnly but it was already effective (prepared, committed or rolledback!)");
            return;
         }

         if (logger.isDebugEnabled()) {
            logger.debug("Marking Transaction " + this.id + " as rollback only");
         }
         state = State.ROLLBACK_ONLY;

         this.exception = exception;
      }
   }

   @Override
   public synchronized void addOperation(final TransactionOperation operation) {
      checkCreateOperations();

      operations.add(operation);
   }

   @Override
   public synchronized void afterStore(TransactionOperation sync) {
      if (storeOperations == null) {
         storeOperations = new LinkedList<>();
      }
      storeOperations.add(sync);
   }

   private int getOperationsCount() {
      checkCreateOperations();

      return operations.size();
   }

   @Override
   public synchronized List<TransactionOperation> getAllOperations() {

      if (operations != null) {
         return new ArrayList<>(operations);
      } else {
         return new ArrayList<>();
      }
   }

   @Override
   public void putProperty(final int index, final Object property) {
      ensurePropertiesCapacity(index + 1);

      properties[index] = property;
   }

   @Override
   public Object getProperty(final int index) {
      return properties == null ? null : (index < properties.length ? properties[index] : null);
   }

   // Private
   // -------------------------------------------------------------------

   protected void doRollback() throws Exception {
      if (containsPersistent || xid != null && state == State.PREPARED) {
         storageManager.rollback(id);
      }
   }

   private void checkCreateOperations() {
      if (operations == null) {
         operations = new LinkedList<>();
      }
   }

   protected synchronized void afterCommit(List<TransactionOperation> operationsToComplete) {
      if (operationsToComplete != null) {
         for (TransactionOperation operation : operationsToComplete) {
            operation.afterCommit(this);
         }
         // Help out GC here
         operationsToComplete.clear();
      }
   }

   private synchronized void afterRollback(List<TransactionOperation> operationsToComplete) {
      if (operationsToComplete != null) {
         for (TransactionOperation operation : operationsToComplete) {
            operation.afterRollback(this);
         }
         // Help out GC here
         operationsToComplete.clear();
      }
   }

   private synchronized void beforeCommit() throws Exception {
      if (operations != null) {
         for (TransactionOperation operation : operations) {
            operation.beforeCommit(this);
         }
      }
      if (storeOperations != null) {
         for (TransactionOperation operation : storeOperations) {
            operation.beforeCommit(this);
         }
      }
   }

   private synchronized void beforePrepare() throws Exception {
      if (operations != null) {
         for (TransactionOperation operation : operations) {
            operation.beforePrepare(this);
         }
      }
      if (storeOperations != null) {
         for (TransactionOperation operation : storeOperations) {
            operation.beforePrepare(this);
         }
      }
   }

   private synchronized void beforeRollback() throws Exception {
      if (operations != null) {
         for (TransactionOperation operation : operations) {
            operation.beforeRollback(this);
         }
      }
      if (storeOperations != null) {
         for (TransactionOperation operation : storeOperations) {
            operation.beforeRollback(this);
         }
      }
   }

   private synchronized void afterPrepare() {
      if (operations != null) {
         for (TransactionOperation operation : operations) {
            operation.afterPrepare(this);
         }
      }
      if (storeOperations != null) {
         for (TransactionOperation operation : storeOperations) {
            operation.afterPrepare(this);
         }
      }
   }

   @Override
   public String toString() {
      Date dt = new Date(this.createTime);
      return "TransactionImpl [xid=" + xid +
         ", txID=" +
         id +
         ", xid=" + xid +
         ", state=" +
         state +
         ", createTime=" +
         createTime + "(" + dt + ")" +
         ", timeoutSeconds=" +
         timeoutSeconds +
         ", nr operations = " + getOperationsCount() +
         "]@" +
         Integer.toHexString(hashCode());
   }
}
