/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.qpid.protonj2.engine.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.buffer.ProtonByteBufferAllocator;
import org.apache.qpid.protonj2.codec.CodecFactory;
import org.apache.qpid.protonj2.codec.Encoder;
import org.apache.qpid.protonj2.codec.EncoderState;
import org.apache.qpid.protonj2.engine.Engine;
import org.apache.qpid.protonj2.engine.EventHandler;
import org.apache.qpid.protonj2.engine.OutgoingDelivery;
import org.apache.qpid.protonj2.engine.Sender;
import org.apache.qpid.protonj2.engine.Transaction;
import org.apache.qpid.protonj2.engine.Transaction.DischargeState;
import org.apache.qpid.protonj2.engine.TransactionController;
import org.apache.qpid.protonj2.engine.TransactionState;
import org.apache.qpid.protonj2.engine.exceptions.EngineFailedException;
import org.apache.qpid.protonj2.engine.exceptions.EngineStateException;
import org.apache.qpid.protonj2.logging.ProtonLogger;
import org.apache.qpid.protonj2.logging.ProtonLoggerFactory;
import org.apache.qpid.protonj2.types.Symbol;
import org.apache.qpid.protonj2.types.messaging.AmqpValue;
import org.apache.qpid.protonj2.types.messaging.Rejected;
import org.apache.qpid.protonj2.types.messaging.Source;
import org.apache.qpid.protonj2.types.transactions.Coordinator;
import org.apache.qpid.protonj2.types.transactions.Declare;
import org.apache.qpid.protonj2.types.transactions.Declared;
import org.apache.qpid.protonj2.types.transactions.Discharge;
import org.apache.qpid.protonj2.types.transport.DeliveryState;
import org.apache.qpid.protonj2.types.transport.DeliveryState.DeliveryStateType;
import org.apache.qpid.protonj2.types.transport.ErrorCondition;

/**
 * {@link TransactionController} implementation that implements the abstraction
 * around a sender link that initiates requests to {@link Declare} and to
 * {@link Discharge} AMQP {@link Transaction} instance.
 */
public class ProtonTransactionController extends ProtonEndpoint<TransactionController> implements TransactionController {

    private static final ProtonLogger LOG = ProtonLoggerFactory.getLogger(ProtonTransactionController.class);

    private static final ProtonBuffer ENCODED_DECLARE;

    static {
        Encoder declareEncoder = CodecFactory.getEncoder();
        EncoderState state = declareEncoder.newEncoderState();

        ENCODED_DECLARE = ProtonByteBufferAllocator.DEFAULT.allocate();

        try {
            declareEncoder.writeObject(ENCODED_DECLARE, state, new AmqpValue<>(new Declare()));
        } finally {
            state.reset();
        }
    }

    private final ProtonSender senderLink;
    private final Encoder commandEncoder = CodecFactory.getEncoder();
    private final ProtonBuffer encoding = ProtonByteBufferAllocator.DEFAULT.allocate();

    private final Set<Transaction<TransactionController>> transactions = new HashSet<>();

    private EventHandler<Transaction<TransactionController>> declaredEventHandler;
    private EventHandler<Transaction<TransactionController>> declareFailureEventHandler;
    private EventHandler<Transaction<TransactionController>> dischargedEventHandler;
    private EventHandler<Transaction<TransactionController>> dischargeFailureEventHandler;

    private EventHandler<TransactionController> parentEndpointClosedEventHandler;

    private List<EventHandler<TransactionController>> capacityObservers = new ArrayList<>();

    /**
     * Creates a new {@link TransactionController} instance that wraps the given {@link Sender} link.
     *
     * @param senderLink
     * 		The {@link Sender} that this {@link TransactionController} wraps.
     */
    public ProtonTransactionController(ProtonSender senderLink) {
        super(senderLink.getEngine());

        this.senderLink = senderLink;
        this.senderLink.setDeliveryTagGenerator(ProtonDeliveryTagGenerator.BUILTIN.POOLED.createGenerator());
        this.senderLink.deliveryStateUpdatedHandler(this::handleDeliveryRemotelyUpdated)
                       .creditStateUpdateHandler(this::handleLinkCreditUpdated)
                       .openHandler(this::handleSenderLinkOpened)
                       .closeHandler(this::handleSenderLinkClosed)
                       .parentEndpointClosedHandler(this::handleParentEndpointClosed)
                       .localOpenHandler(this::handleSenderLinkLocallyOpened)
                       .localCloseHandler(this::handleSenderLinkLocallyClosed)
                       .engineShutdownHandler(this::handleEngineShutdown);
    }

    @Override
    public ProtonSession getParent() {
        return senderLink.getSession();
    }

    @Override
    ProtonTransactionController self() {
        return this;
    }

    @Override
    public boolean hasCapacity() {
        return senderLink.isSendable();
    }

    @Override
    public ProtonTransactionController addCapacityAvailableHandler(EventHandler<TransactionController> handler) {
        if (hasCapacity()) {
            handler.handle(this);
        } else {
            capacityObservers.add(handler);
        }

        return this;
    }

    @Override
    public Collection<Transaction<TransactionController>> transactions() {
        return Collections.unmodifiableCollection(new ArrayList<>(transactions));
    }

    @Override
    public ProtonControllerTransaction newTransaction() {
        ProtonControllerTransaction txn = new ProtonControllerTransaction(this);
        transactions.add(txn);

        return txn;
    }

    @Override
    public Transaction<TransactionController> declare() {
        if (!senderLink.isSendable()) {
            throw new IllegalStateException("Cannot Declare due to current capicity restrictions.");
        }

        final ProtonControllerTransaction transaction = newTransaction();

        declare(transaction);

        return transaction;
    }

    @Override
    public TransactionController declare(Transaction<TransactionController> transaction) {
        if (!senderLink.isSendable()) {
            throw new IllegalStateException("Cannot Declare due to current capicity restrictions.");
        }

        if (transaction.getState() != TransactionState.IDLE) {
            throw new IllegalStateException("Cannot declare a transaction that has already been used previously");
        }

        if (transaction.parent() != this) {
            throw new IllegalArgumentException("Cannot declare a transaction that was created by another controller.");
        }

        ProtonControllerTransaction protonTransaction = (ProtonControllerTransaction) transaction;

        protonTransaction.setState(TransactionState.DECLARING);

        OutgoingDelivery command = senderLink.next();

        command.setLinkedResource(protonTransaction);
        try {
            command.writeBytes(ENCODED_DECLARE);
        } finally {
            ENCODED_DECLARE.setReadIndex(0);
        }

        return this;
    }

    @Override
    public TransactionController discharge(Transaction<TransactionController> transaction, boolean failed) {
        if (transaction.getState() != TransactionState.DECLARED) {
            throw new IllegalStateException("Cannot discharge a transaction that is not currently actively declared.");
        }

        if (transaction.parent() != this) {
            throw new IllegalArgumentException("Cannot discharge a transaction that was created by another controller.");
        }

        if (!senderLink.isSendable()) {
            throw new IllegalStateException("Cannot discharge transaction due to current capicity restrictions.");
        }

        ProtonTransaction<TransactionController> protonTxn = (ProtonTransaction<TransactionController>) transaction;

        protonTxn.setState(TransactionState.DISCHARGING);
        protonTxn.setDischargeState(failed ? DischargeState.ROLLBACK : DischargeState.COMMIT);

        Discharge discharge = new Discharge();
        discharge.setFail(failed);
        discharge.setTxnId(transaction.getTxnId());

        commandEncoder.writeObject(encoding.clear(), commandEncoder.getCachedEncoderState(), new AmqpValue<>(discharge));

        OutgoingDelivery command = senderLink.next();
        command.setMessageFormat(0);
        command.setLinkedResource(transaction);
        command.writeBytes(encoding);

        return this;
    }

    @Override
    public TransactionController declaredHandler(EventHandler<Transaction<TransactionController>> declaredEventHandler) {
        this.declaredEventHandler = declaredEventHandler;
        return this;
    }

    @Override
    public TransactionController declareFailureHandler(EventHandler<Transaction<TransactionController>> declareFailureEventHandler) {
        this.declareFailureEventHandler = declareFailureEventHandler;
        return this;
    }

    @Override
    public TransactionController dischargedHandler(EventHandler<Transaction<TransactionController>> dischargedEventHandler) {
        this.dischargedEventHandler = dischargedEventHandler;
        return this;
    }

    @Override
    public TransactionController dischargeFailureHandler(EventHandler<Transaction<TransactionController>> dischargeFailureEventHandler) {
        this.dischargeFailureEventHandler = dischargeFailureEventHandler;
        return this;
    }

    @Override
    public TransactionController parentEndpointClosedHandler(EventHandler<TransactionController> handler) {
        this.parentEndpointClosedEventHandler = handler;
        return self();
    }

    private void fireParentEndpointClosed() {
        if (parentEndpointClosedEventHandler != null && isLocallyOpen()) {
            parentEndpointClosedEventHandler.handle(self());
        }
    }

    private void fireDeclaredEvent(ProtonControllerTransaction transaction) {
        if (declaredEventHandler != null) {
            declaredEventHandler.handle(transaction);
        } else {
            LOG.debug("Transaction {} declared successfully but no handler registered to signal result", transaction);
        }
    }

    private void fireDeclareFailureEvent(ProtonControllerTransaction transaction) {
        if (declareFailureEventHandler != null) {
            declareFailureEventHandler.handle(transaction);
        } else {
            LOG.debug("Transaction {} declare failed but no handler registered to signal result", transaction);
        }
    }

    private void fireDischargedEvent(ProtonControllerTransaction transaction) {
        if (dischargedEventHandler != null) {
            dischargedEventHandler.handle(transaction);
        } else {
            LOG.debug("Transaction {} discharged successfully but no handler registered to signal result", transaction);
        }
    }

    private void fireDischargeFailureEvent(ProtonControllerTransaction transaction) {
        if (dischargeFailureEventHandler != null) {
            dischargeFailureEventHandler.handle(transaction);
        } else {
            LOG.debug("Transaction {} discharge failed but no handler registered to signal result", transaction);
        }
    }

    //----- Hand off methods for link specific elements.

    @Override
    public TransactionController open() throws IllegalStateException, EngineStateException {
        senderLink.open();
        return this;
    }

    @Override
    public TransactionController close() throws EngineFailedException {
        senderLink.close();
        return this;
    }

    @Override
    public boolean isLocallyOpen() {
        return senderLink.isLocallyOpen();
    }

    @Override
    public boolean isLocallyClosed() {
        return senderLink.isLocallyClosed();
    }

    @Override
    public TransactionController setSource(Source source) throws IllegalStateException {
        senderLink.setSource(source);
        return this;
    }

    @Override
    public Source getSource() {
        return senderLink.getSource();
    }

    @Override
    public TransactionController setCoordinator(Coordinator coordinator) throws IllegalStateException {
        senderLink.setTarget(coordinator);
        return this;
    }

    @Override
    public Coordinator getCoordinator() {
        return senderLink.getTarget();
    }

    @Override
    public ErrorCondition getCondition() {
        return senderLink.getCondition();
    }

    @Override
    public TransactionController setCondition(ErrorCondition condition) {
        senderLink.setCondition(condition);
        return this;
    }

    @Override
    public Map<Symbol, Object> getProperties() {
        return senderLink.getProperties();
    }

    @Override
    public TransactionController setProperties(Map<Symbol, Object> properties) throws IllegalStateException {
        senderLink.setProperties(properties);
        return this;
    }

    @Override
    public TransactionController setOfferedCapabilities(Symbol... offeredCapabilities) throws IllegalStateException {
        senderLink.setOfferedCapabilities(offeredCapabilities);
        return this;
    }

    @Override
    public Symbol[] getOfferedCapabilities() {
        return senderLink.getOfferedCapabilities();
    }

    @Override
    public TransactionController setDesiredCapabilities(Symbol... desiredCapabilities) throws IllegalStateException {
        senderLink.setDesiredCapabilities(desiredCapabilities);
        return this;
    }

    @Override
    public Symbol[] getDesiredCapabilities() {
        return senderLink.getDesiredCapabilities();
    }

    @Override
    public boolean isRemotelyOpen() {
        return senderLink.isRemotelyOpen();
    }

    @Override
    public boolean isRemotelyClosed() {
        return senderLink.isRemotelyClosed();
    }

    @Override
    public Symbol[] getRemoteOfferedCapabilities() {
        return senderLink.getRemoteOfferedCapabilities();
    }

    @Override
    public Symbol[] getRemoteDesiredCapabilities() {
        return senderLink.getRemoteDesiredCapabilities();
    }

    @Override
    public Map<Symbol, Object> getRemoteProperties() {
        return senderLink.getRemoteProperties();
    }

    @Override
    public ErrorCondition getRemoteCondition() {
        return senderLink.getRemoteCondition();
    }

    @Override
    public Source getRemoteSource() {
        return senderLink.getRemoteSource();
    }

    @Override
    public Coordinator getRemoteCoordinator() {
        return senderLink.getRemoteTarget();
    }

    //----- Link event handlers

    private void handleSenderLinkLocallyOpened(Sender sender) {
        fireLocalOpen();
    }

    private void handleSenderLinkLocallyClosed(Sender sender) {
        fireLocalClose();
    }

    private void handleSenderLinkOpened(Sender sender) {
        fireRemoteOpen();
    }

    private void handleSenderLinkClosed(Sender sender) {
        fireRemoteClose();
    }

    private void handleParentEndpointClosed(Sender sender) {
        fireParentEndpointClosed();
    }

    private void handleEngineShutdown(Engine engine) {
        fireEngineShutdown();
    }

    private void handleLinkCreditUpdated(Sender sender) {
        if (sender.isSendable()) {
            // Remove all that can be invoked and leave the rest in place for next credit update.
            capacityObservers.removeIf(handler -> {
                if (hasCapacity()) {
                    handler.handle(this);
                    return true;
                }

                return false;
            });
        }

        if (sender.isDraining()) {
            sender.drained();
        }
    }

    private void handleDeliveryRemotelyUpdated(OutgoingDelivery delivery) {
        ProtonControllerTransaction transaction = delivery.getLinkedResource();

        DeliveryState state = delivery.getRemoteState();
        TransactionState transactionState = transaction.getState();

        try {
            switch (state.getType()) {
                case Declared:
                    Declared declared = (Declared) state;
                    transaction.setState(TransactionState.DECLARED);
                    transaction.setTxnId(declared.getTxnId());
                    fireDeclaredEvent(transaction);
                    break;
                case Accepted:
                    transaction.setState(TransactionState.DISCHARGED);
                    transactions.remove(transaction);
                    fireDischargedEvent(transaction);
                    break;
                default:
                    if (state.getType() == DeliveryStateType.Rejected) {
                        Rejected rejected = (Rejected) state;
                        transaction.setCondition(rejected.getError());
                    }

                    transactions.remove(transaction);

                    if (transactionState == TransactionState.DECLARING) {
                        transaction.setState(TransactionState.DECLARE_FAILED);
                        fireDeclareFailureEvent(transaction);
                    } else {
                        transaction.setState(TransactionState.DISCHARGE_FAILED);
                        fireDischargeFailureEvent(transaction);
                    }

                    break;
            }
        } finally {
            delivery.settle();
        }
    }

    //----- The Controller specific Transaction implementation

    private final class ProtonControllerTransaction extends ProtonTransaction<TransactionController> implements Transaction<TransactionController> {

        private final ProtonTransactionController controller;

        public ProtonControllerTransaction(ProtonTransactionController controller) {
            this.controller = controller;
        }

        @Override
        public ProtonTransactionController parent() {
            return controller;
        }
    }
}
