/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.artemis.protocol.amqp.proton;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.DestinationUtil;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.DeliveryState.DeliveryStateType;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.jboss.logging.Logger;

/**
 * This is the Equivalent for the ServerConsumer
 */
public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {

   private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);

   private static final Symbol COPY = Symbol.valueOf("copy");
   private static final Symbol TOPIC = Symbol.valueOf("topic");
   private static final Symbol QUEUE = Symbol.valueOf("queue");
   private static final Symbol SHARED = Symbol.valueOf("shared");
   private static final Symbol GLOBAL = Symbol.valueOf("global");

   SenderController controller;

   private final ConnectionFlushIOCallback connectionFlusher = new ConnectionFlushIOCallback();

   private Consumer brokerConsumer;
   private ReadyListener onflowControlReady;
   protected final AMQPSessionContext protonSession;
   protected final Sender sender;
   protected final AMQPConnectionContext connection;
   protected boolean closed = false;
   protected final AMQPSessionCallback sessionSPI;

   private boolean preSettle;

   private final AtomicBoolean draining = new AtomicBoolean(false);

   // once a large message is accepted, we shouldn't accept any further messages
   // as large message could be interrupted due to flow control and resumed at the same message
   volatile boolean hasLarge = false;
   volatile LargeMessageDeliveryContext pendingLargeMessage = null;


   private int credits = 0;

   private AtomicInteger pending = new AtomicInteger(0);
   /**
    * The model proton uses requires us to hold a lock in certain times
    * to sync the credits we have versus the credits that are being held in proton
    * */
   private final Object creditsLock = new Object();
   private final java.util.function.Consumer<? super MessageReference> executeDelivery;
   private java.util.function.Consumer<? super MessageReference> beforeDelivery;
   private final boolean amqpTreatRejectAsUnmodifiedDeliveryFailed;

   public ProtonServerSenderContext(AMQPConnectionContext connection,
                                    Sender sender,
                                    AMQPSessionContext protonSession,
                                    AMQPSessionCallback server) {
      this(connection, sender, protonSession, server, null);
   }

   public ProtonServerSenderContext(AMQPConnectionContext connection,
                                    Sender sender,
                                    AMQPSessionContext protonSession,
                                    AMQPSessionCallback server,
                                    SenderController senderController) {
      super();
      this.controller = senderController;
      this.connection = connection;
      this.sender = sender;
      this.protonSession = protonSession;
      this.sessionSPI = server;
      this.executeDelivery = this::executeDelivery;
      amqpTreatRejectAsUnmodifiedDeliveryFailed = this.connection.getProtocolManager()
                                                                 .isAmqpTreatRejectAsUnmodifiedDeliveryFailed();
   }

   public ProtonServerSenderContext setBeforeDelivery(java.util.function.Consumer<? super MessageReference> beforeDelivery) {
      this.beforeDelivery = beforeDelivery;
      return this;
   }

   public Object getBrokerConsumer() {
      return brokerConsumer;
   }

   @Override
   public void onFlow(int currentCredits, boolean drain) {
      connection.requireInHandler();

      setupCredit();

      ServerConsumerImpl serverConsumer = (ServerConsumerImpl) brokerConsumer;
      if (drain) {
         // If the draining is already running, then don't do anything
         if (draining.compareAndSet(false, true)) {
            final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext();
            serverConsumer.forceDelivery(1, new Runnable() {
               @Override
               public void run() {
                  try {
                     connection.runNow(() -> {
                        plugSender.reportDrained();
                        setupCredit();
                     });
                  } finally {
                     draining.set(false);
                  }
               }
            });
         }
      } else {
         serverConsumer.receiveCredits(-1);
      }
   }

   public boolean hasCredits() {
      if (hasLarge) {
         // we will resume accepting once the large message is finished
         return false;
      }

      if (!connection.flowControl(onflowControlReady)) {
         return false;
      }

      synchronized (creditsLock) {
         return credits > 0 && sender.getLocalState() != EndpointState.CLOSED;
      }
   }

   private void setupCredit() {
      synchronized (creditsLock) {
         this.credits = sender.getCredit() - pending.get();
         if (credits < 0) {
            credits = 0;
         }
      }
   }

   public Sender getSender() {
      return sender;
   }

   /*
    * start the session
    */
   public void start() throws ActiveMQAMQPException {
      sessionSPI.start();
      // protonSession.getServerSession().start();

      // todo add flow control
      try {
         // to do whatever you need to make the broker start sending messages to the consumer
         // this could be null if a link reattach has happened
         if (brokerConsumer != null) {
            sessionSPI.startSender(brokerConsumer);
         }
         // protonSession.getServerSession().receiveConsumerCredits(consumerID, -1);
      } catch (Exception e) {
         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage());
      }
   }


   /**
    * create the actual underlying ActiveMQ Artemis Server Consumer
    */
   @Override
   public void initialize() throws Exception {
      super.initialize();

      if (controller == null) {
         controller = new DefaultController(sessionSPI);
      }

      try {
         brokerConsumer = controller.init(this);
         onflowControlReady = brokerConsumer::promptDelivery;
      } catch (ActiveMQAMQPResourceLimitExceededException e1) {
         throw e1;
      } catch (ActiveMQSecurityException e) {
         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
      } catch (ActiveMQQueueMaxConsumerLimitReached e) {
         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
      } catch (ActiveMQException e) {
         throw e;
      } catch (Exception e) {
         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
      }
   }

   protected String getClientId() {
      return connection.getRemoteContainer();
   }

   /*
    * close the session
    */
   @Override
   public void close(ErrorCondition condition) throws ActiveMQAMQPException {
      closed = true;
      if (condition != null) {
         sender.setCondition(condition);
      }
      protonSession.removeSender(sender);

      connection.runLater(() -> {
         sender.close();
         try {
            sessionSPI.closeSender(brokerConsumer);
         } catch (Exception e) {
            log.warn(e.getMessage(), e);
         }
         sender.close();
         connection.flush();
      });
   }

   /*
    * close the session
    */
   @Override
   public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
      // we need to mark closed first to make sure no more adds are accepted
      closed = true;

      // MessageReferences are sent to the Connection executor (Netty Loop)
      // as a result the returning references have to be done later after they
      // had their chance to finish and clear the runnable
      connection.runLater(() -> {
         try {
            internalClose(remoteLinkClose);
         } catch (Exception e) {
            log.warn(e.getMessage(), e);
         }
      });
   }

   private void internalClose(boolean remoteLinkClose) throws ActiveMQAMQPException {
      try {
         protonSession.removeSender(sender);
         sessionSPI.closeSender(brokerConsumer);
         // if this is a link close rather than a connection close or detach, we need to delete
         // any durable resources for say pub subs
         if (remoteLinkClose) {
            controller.close();

         }
      } catch (Exception e) {
         log.warn(e.getMessage(), e);
         throw new ActiveMQAMQPInternalErrorException(e.getMessage());
      }
   }

   @Override
   public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
      if (closed) {
         return;
      }

      OperationContext oldContext = sessionSPI.recoverContext();

      try {
         Message message = ((MessageReference) delivery.getContext()).getMessage();
         DeliveryState remoteState = delivery.getRemoteState();

         if (remoteState != null && remoteState.getType() == DeliveryStateType.Accepted) {
            // this can happen in the twice ack mode, that is the receiver accepts and settles separately
            // acking again would show an exception but would have no negative effect but best to handle anyway.
            if (!delivery.isSettled()) {
               doAck(message);

               delivery.settle();
            }
         } else {
            handleExtendedDeliveryOutcomes(message, delivery, remoteState);
         }

         if (!preSettle) {
            protonSession.replaceTag(delivery.getTag());
         }
      } finally {
         sessionSPI.afterIO(connectionFlusher);
         sessionSPI.resetContext(oldContext);
      }
   }

   protected void doAck(Message message) throws ActiveMQAMQPIllegalStateException {
      // we have to individual ack as we can't guarantee we will get the delivery updates
      // (including acks) in order from dealer, a performance hit but a must
      try {
         sessionSPI.ack(null, brokerConsumer, message);
      } catch (Exception e) {
         log.warn(e.toString(), e);
         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
      }
   }

   private boolean handleExtendedDeliveryOutcomes(Message message, Delivery delivery, DeliveryState remoteState) throws ActiveMQAMQPException {
      boolean settleImmediate = true;
      boolean handled = true;

      if (remoteState == null) {
         log.debug("Received null disposition for delivery update: " + remoteState);
         return true;
      }

      switch (remoteState.getType()) {
         case Transactional:
            // When the message arrives with a TransactionState disposition the ack should
            // enlist the message into the transaction associated with the given txn ID.
            TransactionalState txState = (TransactionalState) remoteState;
            ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false);

            if (txState.getOutcome() != null) {
               settleImmediate = false;
               Outcome outcome = txState.getOutcome();
               if (outcome instanceof Accepted) {
                  if (!delivery.remotelySettled()) {
                     TransactionalState txAccepted = new TransactionalState();
                     txAccepted.setOutcome(Accepted.getInstance());
                     txAccepted.setTxnId(txState.getTxnId());
                     delivery.disposition(txAccepted);
                  }
                  // we have to individual ack as we can't guarantee we will get the delivery
                  // (including acks) in order from dealer, a performance hit but a must
                  try {
                     sessionSPI.ack(tx, brokerConsumer, message);
                     tx.addDelivery(delivery, this);
                  } catch (Exception e) {
                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
                  }
               }
            }
            break;
         case Released:
            try {
               sessionSPI.cancel(brokerConsumer, message, false);
            } catch (Exception e) {
               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
            }
            break;
         case Rejected:
            try {
               if (amqpTreatRejectAsUnmodifiedDeliveryFailed) {
                  // We could be more discriminating - for instance check for AmqpError#RESOURCE_LIMIT_EXCEEDED
                  sessionSPI.cancel(brokerConsumer, message, true);
               } else {
                  sessionSPI.reject(brokerConsumer, message);
               }
            } catch (Exception e) {
               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
            }
            break;
         case Modified:
            try {
               Modified modification = (Modified) remoteState;

               if (Boolean.TRUE.equals(modification.getUndeliverableHere())) {
                  message.rejectConsumer(brokerConsumer.sequentialID());
               }

               if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
                  sessionSPI.cancel(brokerConsumer, message, true);
               } else {
                  sessionSPI.cancel(brokerConsumer, message, false);
               }
            } catch (Exception e) {
               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
            }
            break;
         default:
            log.debug("Received null or unknown disposition for delivery update: " + remoteState);
            handled = false;
      }

      if (settleImmediate) {
         delivery.settle();
      }

      return handled;
   }

   private final class ConnectionFlushIOCallback implements IOCallback {
      @Override
      public void done() {
         connection.flush();
      }

      @Override
      public void onError(int errorCode, String errorMessage) {
         connection.flush();
      }
   }

   public void settle(Delivery delivery) {
      connection.requireInHandler();
      delivery.settle();
   }

   public synchronized void checkState() {
      sessionSPI.resumeDelivery(brokerConsumer);
   }

   /**
    * handle an out going message from ActiveMQ Artemis, send via the Proton Sender
    */
   public int deliverMessage(final MessageReference messageReference, final ServerConsumer consumer) throws Exception {

      if (closed) {
         return 0;
      }

      if (beforeDelivery != null) {
         beforeDelivery.accept(messageReference);
      }

      try {
         synchronized (creditsLock) {
            if (sender.getLocalState() == EndpointState.CLOSED) {
               return 0;
            }
            pending.incrementAndGet();
            credits--;
         }

         if (messageReference.getMessage() instanceof AMQPLargeMessage) {
            hasLarge = true;
         }

         if (messageReference instanceof Runnable && consumer.allowReferenceCallback()) {
            messageReference.onDelivery(executeDelivery);
            connection.runNow((Runnable) messageReference);
         } else {
            connection.runNow(() -> executeDelivery(messageReference));
         }

         // This is because on AMQP we only send messages based in credits, not bytes
         return 1;
      } finally {

      }
   }

   private void executeDelivery(MessageReference messageReference) {

      try {
         if (sender.getLocalState() == EndpointState.CLOSED) {
            log.debug("Not delivering message " + messageReference + " as the sender is closed and credits were available, if you see too many of these it means clients are issuing credits and closing the connection with pending credits a lot of times");
            return;
         }
         AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage(), sessionSPI.getStorageManager());

         if (sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection()) != null) {
            return;
         }
         if (message instanceof AMQPLargeMessage) {
            deliverLarge(messageReference, (AMQPLargeMessage) message);
         } else {
            deliverStandard(messageReference, message);
         }

      } catch (Exception e) {
         log.warn(e.getMessage(), e);
         brokerConsumer.errorProcessing(e, messageReference);
      }
   }

   private class LargeMessageDeliveryContext {

      LargeMessageDeliveryContext(MessageReference reference, AMQPLargeMessage message, Delivery delivery) {
         this.position = 0L;
         this.reference = reference;
         this.message = message;
         this.delivery = delivery;
      }

      long position;
      final MessageReference reference;
      final AMQPLargeMessage message;
      final Delivery delivery;

      void resume() {
         connection.runNow(this::deliver);
      }

      void deliver() {

         // This is discounting some bytes due to Transfer payload
         int frameSize = protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit() - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0);

         DeliveryAnnotations deliveryAnnotationsToEncode;

         message.checkReference(reference);

         if (reference.getProtocolData() != null && reference.getProtocolData() instanceof DeliveryAnnotations) {
            deliveryAnnotationsToEncode = (DeliveryAnnotations)reference.getProtocolData();
         } else {
            deliveryAnnotationsToEncode = null;
         }

         LargeBodyReader context = message.getLargeBodyReader();
         try {
            context.open();
            try {
               context.position(position);
               long bodySize = context.getSize();

               ByteBuffer buf = ByteBuffer.allocate(frameSize);

               for (; position < bodySize; ) {
                  if (!connection.flowControl(this::resume)) {
                     context.close();
                     return;
                  }
                  buf.clear();
                  int size = 0;

                  try {
                     if (position == 0) {
                        replaceInitialHeader(deliveryAnnotationsToEncode, context, WritableBuffer.ByteBufferWrapper.wrap(buf));
                     }
                     size = context.readInto(buf);

                     sender.send(new ReadableBuffer.ByteBufferReader(buf));
                     position += size;
                  } catch (java.nio.BufferOverflowException overflowException) {
                     if (position == 0) {
                        if (log.isDebugEnabled()) {
                           log.debug("Delivery of message failed with an overFlowException, retrying again with expandable buffer");
                        }
                        // on the very first packet, if the initial header was replaced with a much bigger header (re-encoding)
                        // we could recover the situation with a retry using an expandable buffer.
                        // this is tested on org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
                        size = retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context, buf);
                     } else {
                        // if this is not the position 0, something is going on
                        // we just forward the exception as this is not supposed to happen
                        throw overflowException;
                     }
                  }

                  if (size > 0) {

                     if (position < bodySize) {
                        connection.instantFlush();
                     }
                  }
               }
            } finally {
               context.close();
            }

            if (preSettle) {
               // Presettled means the client implicitly accepts any delivery we send it.
               try {
                  sessionSPI.ack(null, brokerConsumer, reference.getMessage());
               } catch (Exception e) {
                  log.debug(e.getMessage(), e);
               }
               delivery.settle();
            } else {
               sender.advance();
            }

            connection.instantFlush();

            synchronized (creditsLock) {
               pending.decrementAndGet();
            }

            finishLargeMessage();
         } catch (Exception e) {
            log.warn(e.getMessage(), e);
            brokerConsumer.errorProcessing(e, reference);
         }
      }

      /**
       * This is a retry logic when either the delivery annotations or re-encoded buffer is bigger than the frame size
       * This will create one expandable buffer.
       * It will then let Proton to do the framing correctly
       */
      private int retryInitialPacketWithExpandableBuffer(DeliveryAnnotations deliveryAnnotationsToEncode,
                                                         LargeBodyReader context,
                                                         ByteBuffer buf) throws Exception {
         int size;
         buf.clear();
         // if the buffer overflow happened during the initial position
         // this means the replaced headers are bigger then the frame size
         // on this case we do with an expandable netty buffer
         ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.buffer(AMQPMessageBrokerAccessor.getRemainingBodyPosition(message) * 2);
         try {
            replaceInitialHeader(deliveryAnnotationsToEncode, context, new NettyWritable(nettyBuffer));
            size = context.readInto(buf);
            position += size;

            nettyBuffer.writeBytes(buf);

            ByteBuffer nioBuffer = nettyBuffer.nioBuffer();
            nioBuffer.position(nettyBuffer.writerIndex());
            nioBuffer = (ByteBuffer) nioBuffer.flip();
            sender.send(new ReadableBuffer.ByteBufferReader(nioBuffer));
         } finally {
            nettyBuffer.release();
         }
         return size;
      }

      private int replaceInitialHeader(DeliveryAnnotations deliveryAnnotationsToEncode,
                                        LargeBodyReader context,
                                        WritableBuffer buf) throws Exception {
         TLSEncode.getEncoder().setByteBuffer(buf);
         try {
            int proposedPosition = writeHeaderAndAnnotations(context, deliveryAnnotationsToEncode);
            if (message.isReencoded()) {
               proposedPosition = writeMessageAnnotationsPropertiesAndApplicationProperties(context, message);
            }

            context.position(proposedPosition);
            position = proposedPosition;
            return (int)position;
         } finally {

            TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
         }
      }

      /**
       * Write properties and application properties when the message is flagged as re-encoded.
       */
      private int writeMessageAnnotationsPropertiesAndApplicationProperties(LargeBodyReader context, AMQPLargeMessage message) throws Exception {
         int bodyPosition = AMQPMessageBrokerAccessor.getRemainingBodyPosition(message);
         assert bodyPosition > 0;
         writeMessageAnnotationsPropertiesAndApplicationPropertiesInternal(message);
         return bodyPosition;
      }

      private void writeMessageAnnotationsPropertiesAndApplicationPropertiesInternal(AMQPLargeMessage message) {
         MessageAnnotations messageAnnotations = AMQPMessageBrokerAccessor.getDecodedMessageAnnotations(message);

         if (messageAnnotations != null) {
            TLSEncode.getEncoder().writeObject(messageAnnotations);
         }

         Properties amqpProperties = AMQPMessageBrokerAccessor.getCurrentProperties(message);
         if (amqpProperties != null) {
            TLSEncode.getEncoder().writeObject(amqpProperties);
         }

         ApplicationProperties applicationProperties = AMQPMessageBrokerAccessor.getDecodedApplicationProperties(message);

         if (applicationProperties != null) {
            TLSEncode.getEncoder().writeObject(applicationProperties);
         }
      }

      private int writeHeaderAndAnnotations(LargeBodyReader context,
                                             DeliveryAnnotations deliveryAnnotationsToEncode) throws ActiveMQException {
         Header header = AMQPMessageBrokerAccessor.getCurrentHeader(message);
         if (header != null) {
            TLSEncode.getEncoder().writeObject(header);
         }
         if (deliveryAnnotationsToEncode != null) {
            TLSEncode.getEncoder().writeObject(deliveryAnnotationsToEncode);
         }
         return message.getPositionAfterDeliveryAnnotations();
      }
   }

   private void finishLargeMessage() {
      pendingLargeMessage = null;
      hasLarge = false;
      brokerConsumer.promptDelivery();
   }

   private void deliverLarge(MessageReference messageReference, AMQPLargeMessage message) {

      // we only need a tag if we are going to settle later
      byte[] tag = preSettle ? new byte[0] : protonSession.getTag();

      final Delivery delivery;
      delivery = sender.delivery(tag, 0, tag.length);
      delivery.setMessageFormat((int) message.getMessageFormat());
      delivery.setContext(messageReference);

      pendingLargeMessage = new LargeMessageDeliveryContext(messageReference, message, delivery);
      pendingLargeMessage.deliver();

   }

   private void deliverStandard(MessageReference messageReference, AMQPMessage message) {
      // Let the Message decide how to present the message bytes
      ReadableBuffer sendBuffer = message.getSendBuffer(messageReference.getDeliveryCount(), messageReference);
      // we only need a tag if we are going to settle later
      byte[] tag = preSettle ? new byte[0] : protonSession.getTag();

      boolean releaseRequired = sendBuffer instanceof NettyReadable;
      final Delivery delivery;
      delivery = sender.delivery(tag, 0, tag.length);
      delivery.setMessageFormat((int) message.getMessageFormat());
      delivery.setContext(messageReference);

      try {

         if (releaseRequired) {
            sender.send(sendBuffer);
            // Above send copied, so release now if needed
            releaseRequired = false;
            ((NettyReadable) sendBuffer).getByteBuf().release();
         } else {
            // Don't have pooled content, no need to release or copy.
            sender.sendNoCopy(sendBuffer);
         }

         if (preSettle) {
            // Presettled means the client implicitly accepts any delivery we send it.
            try {
               sessionSPI.ack(null, brokerConsumer, messageReference.getMessage());
            } catch (Exception e) {
               log.debug(e.getMessage(), e);
            }
            delivery.settle();
         } else {
            sender.advance();
         }

         connection.flush();
      } finally {
         synchronized (creditsLock) {
            pending.decrementAndGet();
         }
         if (releaseRequired) {
            ((NettyReadable) sendBuffer).getByteBuf().release();
         }
      }
   }

   private static boolean hasCapabilities(Symbol symbol, Source source) {
      if (source != null) {
         if (source.getCapabilities() != null) {
            for (Symbol cap : source.getCapabilities()) {
               if (symbol.equals(cap)) {
                  return true;
               }
            }
         }
      }
      return false;
   }

   private static boolean hasRemoteDesiredCapability(Link link, Symbol capability) {
      Symbol[] remoteDesiredCapabilities = link.getRemoteDesiredCapabilities();
      if (remoteDesiredCapabilities != null) {
         for (Symbol cap : remoteDesiredCapabilities) {
            if (capability.equals(cap)) {
               return true;
            }
         }
      }
      return false;
   }

   private static SimpleString createQueueName(boolean useCoreSubscriptionNaming,
                                               String clientId,
                                               String pubId,
                                               boolean shared,
                                               boolean global,
                                               boolean isVolatile) {
      if (useCoreSubscriptionNaming) {
         final boolean durable = !isVolatile;
         final String subscriptionName = pubId.contains("|") ? pubId.split("\\|")[0] : pubId;
         final String clientID = clientId == null || clientId.isEmpty() || global ? null : clientId;
         return DestinationUtil.createQueueNameForSubscription(durable, clientID, subscriptionName);
      } else {
         String queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId + "." + pubId;
         if (shared) {
            if (queue.contains("|")) {
               queue = queue.split("\\|")[0];
            }
            if (isVolatile) {
               queue += ":shared-volatile";
            }
            if (global) {
               queue += ":global";
            }
         }
         return SimpleString.toSimpleString(queue);
      }
   }

   /**
    * Update link state to reflect that the previous drain attempt has completed.
    */
   public void reportDrained() {
      connection.requireInHandler();
      sender.drained();
      connection.instantFlush();
   }

   public AMQPSessionContext getSessionContext() {
      return protonSession;
   }

   class DefaultController implements SenderController {


      private boolean shared = false;
      boolean global = false;
      boolean multicast;
      final AMQPSessionCallback sessionSPI;
      SimpleString queue = null;
      SimpleString tempQueueName;
      String selector;

      private final RoutingType defaultRoutingType = RoutingType.ANYCAST;
      private RoutingType routingTypeToUse = RoutingType.ANYCAST;

      private boolean isVolatile = false;

      DefaultController(AMQPSessionCallback sessionSPI) {
         this.sessionSPI = sessionSPI;

      }

      @Override
      public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
         Source source = (Source) sender.getRemoteSource();
         final Map<Symbol, Object> supportedFilters = new HashMap<>();

         // Match the settlement mode of the remote instead of relying on the default of MIXED.
         sender.setSenderSettleMode(sender.getRemoteSenderSettleMode());

         // We don't currently support SECOND so enforce that the answer is anlways FIRST
         sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);

         if (source != null) {
            // We look for message selectors on every receiver, while in other cases we might only
            // consume the filter depending on the subscription type.
            Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
            if (filter != null) {
               selector = filter.getValue().getDescribed().toString();
               // Validate the Selector.
               try {
                  SelectorParser.parse(selector);
               } catch (FilterException e) {
                  throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
               }

               supportedFilters.put(filter.getKey(), filter.getValue());
            }
         }

         if (source == null) {
            // Attempt to recover a previous subscription happens when a link reattach happens on a
            // subscription queue
            String clientId = getClientId();
            String pubId = sender.getName();
            global = hasRemoteDesiredCapability(sender, GLOBAL);
            shared = hasRemoteDesiredCapability(sender, SHARED);
            queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, true, global, false);
            QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false);
            multicast = true;
            routingTypeToUse = RoutingType.MULTICAST;

            // Once confirmed that the address exists we need to return a Source that reflects
            // the lifetime policy and capabilities of the new subscription.
            if (result.isExists()) {
               source = new org.apache.qpid.proton.amqp.messaging.Source();
               source.setAddress(queue.toString());
               source.setDurable(TerminusDurability.UNSETTLED_STATE);
               source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
               source.setDistributionMode(COPY);
               source.setCapabilities(TOPIC);

               SimpleString filterString = result.getFilterString();
               if (filterString != null) {
                  selector = filterString.toString();
                  boolean noLocal = false;

                  String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
                  String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";

                  if (selector.endsWith(noLocalFilter)) {
                     if (selector.length() > noLocalFilter.length()) {
                        noLocalFilter = " AND " + noLocalFilter;
                        selector = selector.substring(0, selector.length() - noLocalFilter.length());
                     } else {
                        selector = null;
                     }

                     noLocal = true;
                  }

                  if (noLocal) {
                     supportedFilters.put(AmqpSupport.NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
                  }

                  if (selector != null && !selector.trim().isEmpty()) {
                     supportedFilters.put(AmqpSupport.JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(selector));
                  }
               }

               sender.setSource(source);
            } else {
               throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName());
            }
         } else if (source.getDynamic()) {
            // if dynamic we have to create the node (queue) and set the address on the target, the
            // node is temporary and  will be deleted on closing of the session
            queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString());
            tempQueueName = queue;
            try {
               sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
               // protonSession.getServerSession().createQueue(queue, queue, null, true, false);
            } catch (Exception e) {
               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
            }
            source.setAddress(queue.toString());
         } else {
            SimpleString addressToUse;
            SimpleString queueNameToUse = null;
            shared = hasCapabilities(SHARED, source);
            global = hasCapabilities(GLOBAL, source);

            //find out if we have an address made up of the address and queue name, if yes then set queue name
            if (CompositeAddress.isFullyQualified(source.getAddress())) {
               addressToUse = SimpleString.toSimpleString(CompositeAddress.extractAddressName(source.getAddress()));
               queueNameToUse = SimpleString.toSimpleString(CompositeAddress.extractQueueName(source.getAddress()));
            } else {
               addressToUse = SimpleString.toSimpleString(source.getAddress());
            }
            //check to see if the client has defined how we act
            boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source);
            if (clientDefined) {
               multicast = hasCapabilities(TOPIC, source);
               AddressQueryResult addressQueryResult = null;
               try {
                  addressQueryResult = sessionSPI.addressQuery(addressToUse, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
               } catch (ActiveMQSecurityException e) {
                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
               } catch (ActiveMQAMQPException e) {
                  throw e;
               } catch (Exception e) {
                  throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
               }

               if (!addressQueryResult.isExists()) {
                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
               }

               Set<RoutingType> routingTypes = addressQueryResult.getRoutingTypes();

               //if the client defines 1 routing type and the broker another then throw an exception
               if (multicast && !routingTypes.contains(RoutingType.MULTICAST)) {
                  throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for topic support");
               } else if (!multicast && !routingTypes.contains(RoutingType.ANYCAST)) {
                  //if client specifies fully qualified name that's allowed, don't throw exception.
                  if (queueNameToUse == null) {
                     throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for queue support");
                  }
               }
            } else {
               // if not we look up the address
               AddressQueryResult addressQueryResult = null;
               try {
                  addressQueryResult = sessionSPI.addressQuery(addressToUse, defaultRoutingType, true);
               } catch (ActiveMQSecurityException e) {
                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
               } catch (ActiveMQAMQPException e) {
                  throw e;
               } catch (Exception e) {
                  throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
               }

               if (!addressQueryResult.isExists()) {
                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
               }

               Set<RoutingType> routingTypes = addressQueryResult.getRoutingTypes();
               if (routingTypes.contains(RoutingType.MULTICAST) && routingTypes.size() == 1) {
                  multicast = true;
               } else {
                  //todo add some checks if both routing types are supported
                  multicast = false;
               }
            }
            routingTypeToUse = multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST;
            // if not dynamic then we use the target's address as the address to forward the
            // messages to, however there has to be a queue bound to it so we need to check this.
            if (multicast) {
               Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
               if (filter != null) {
                  String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
                  String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
                  if (selector != null) {
                     selector += " AND " + noLocalFilter;
                  } else {
                     selector = noLocalFilter;
                  }

                  supportedFilters.put(filter.getKey(), filter.getValue());
               }

               queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST);
               SimpleString simpleStringSelector = SimpleString.toSimpleString(selector);

               //if the address specifies a broker configured queue then we always use this, treat it as a queue
               if (queue != null) {
                  multicast = false;
               } else if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {

                  // if we are a subscription and durable create a durable queue using the container
                  // id and link name
                  String clientId = getClientId();
                  String pubId = sender.getName();
                  queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false);
                  QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
                  if (result.isExists()) {
                     /*
                      * If a client reattaches to a durable subscription with a different filter or address then we must
                      * recreate the queue (JMS semantics). However, if the corresponding queue is managed via the
                      * configuration then we don't want to change it
                      */
                     if (!result.isConfigurationManaged() && (!Objects.equals(result.getAddress(), addressToUse) || !Objects.equals(result.getFilterString(), simpleStringSelector))) {

                        if (result.getConsumerCount() == 0) {
                           sessionSPI.deleteQueue(queue);
                           sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                        } else {
                           throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
                        }
                     }
                  } else {
                     if (shared) {
                        sessionSPI.createSharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                     } else {
                        sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                     }
                  }
               } else {
                  // otherwise we are a volatile subscription
                  isVolatile = true;
                  if (shared && sender.getName() != null) {
                     queue = createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), sender.getName(), shared, global, isVolatile);
                     QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
                     if ((!result.isExists() || !Objects.equals(result.getAddress(), addressToUse) || !Objects.equals(result.getFilterString(), simpleStringSelector)) && !result.isConfigurationManaged()) {
                        sessionSPI.createSharedVolatileQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                     }
                  } else {
                     queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString());
                     tempQueueName = queue;
                     try {
                        sessionSPI.createTemporaryQueue(addressToUse, queue, RoutingType.MULTICAST, simpleStringSelector);
                     } catch (Exception e) {
                        throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
                     }
                  }
               }
            } else {
               if (queueNameToUse != null) {
                  //a queue consumer can receive from a multicast queue if it uses a fully qualified name
                  //setting routingType to null means do not check the routingType against the Queue's routing type.
                  routingTypeToUse = null;
                  SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, null);
                  if (matchingAnycastQueue != null) {
                     queue = matchingAnycastQueue;
                  } else {
                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                  }
               } else {
                  SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST);
                  if (matchingAnycastQueue != null) {
                     queue = matchingAnycastQueue;
                  } else {
                     queue = addressToUse;
                  }
               }

            }

            if (queue == null) {
               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
            }

            try {
               if (!sessionSPI.queueQuery(queue, routingTypeToUse, !multicast).isExists()) {
                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
               }
            } catch (ActiveMQAMQPNotFoundException e) {
               throw e;
            } catch (Exception e) {
               throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
            }
         }

         // Detect if sender is in pre-settle mode.
         preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;

         // We need to update the source with any filters we support otherwise the client
         // is free to consider the attach as having failed if we don't send back what we
         // do support or if we send something we don't support the client won't know we
         // have not honored what it asked for.
         source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);

         boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);

         return (Consumer) sessionSPI.createSender(senderContext, queue, multicast ? null : selector, browseOnly);
      }


      private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
         if (queueName != null) {
            QueueQueryResult result = sessionSPI.queueQuery(CompositeAddress.toFullyQualified(address, queueName), routingType, true);
            if (!result.isExists()) {
               throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist");
            } else {
               if (!result.getAddress().equals(address)) {
                  throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'");
               }
               return sessionSPI.getMatchingQueue(address, queueName, routingType);
            }
         }
         return null;
      }


      @Override
      public void close() throws Exception {
         Source source = (Source) sender.getSource();
         if (source != null && source.getAddress() != null && multicast) {
            SimpleString queueName = SimpleString.toSimpleString(source.getAddress());
            QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false);
            if (result.isExists() && source.getDynamic()) {
               sessionSPI.deleteQueue(queueName);
            } else {
               if (source.getDurable() == TerminusDurability.NONE && tempQueueName != null && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
                  sessionSPI.removeTemporaryQueue(tempQueueName);
               } else {
                  String clientId = getClientId();
                  String pubId = sender.getName();
                  if (pubId.contains("|")) {
                     pubId = pubId.split("\\|")[0];
                  }
                  SimpleString queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile);
                  result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
                  //only delete if it isn't volatile and has no consumers
                  if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
                     sessionSPI.deleteQueue(queue);
                  }
               }
            }
         } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
            try {
               sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(source.getAddress()));
            } catch (Exception e) {
               //ignore on close, its temp anyway and will be removed later
            }
         }
      }

   }
}
