/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.artemis.core.server.impl;

import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToLongFunction;

import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;

public class ScheduledDeliveryHandlerTest extends Assert {

   private static final Logger log = Logger.getLogger(ScheduledDeliveryHandlerTest.class);

   @Test
   public void testScheduleRandom() throws Exception {
      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));

      long nextMessage = 0;
      long NUMBER_OF_SEQUENCES = 100000;

      for (int i = 0; i < NUMBER_OF_SEQUENCES; i++) {
         int numberOfMessages = RandomUtil.randomInt() % 10;
         if (numberOfMessages == 0)
            numberOfMessages = 1;

         long nextScheduledTime = RandomUtil.randomPositiveLong();

         for (int j = 0; j < numberOfMessages; j++) {
            boolean tail = RandomUtil.randomBoolean();

            addMessage(handler, nextMessage++, nextScheduledTime, tail);
         }
      }

      debugList(true, handler, nextMessage);

   }

   @Test
   public void testScheduleSameTimeHeadAndTail() throws Exception {
      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));

      long time = System.currentTimeMillis() + 10000;
      for (int i = 10001; i < 20000; i++) {
         addMessage(handler, i, time, true);
      }
      addMessage(handler, 10000, time, false);

      time = System.currentTimeMillis() + 5000;
      for (int i = 1; i < 10000; i++) {
         addMessage(handler, i, time, true);
      }
      addMessage(handler, 0, time, false);

      debugList(true, handler, 20000);

      validateSequence(handler);

   }

   @Test
   public void testScheduleFixedSample() throws Exception {
      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));

      addMessage(handler, 0, 48L, true);
      addMessage(handler, 1, 75L, true);
      addMessage(handler, 2, 56L, true);
      addMessage(handler, 3, 7L, false);
      addMessage(handler, 4, 69L, true);

      debugList(true, handler, 5);

   }

   @Test
   public void testScheduleWithAddHeads() throws Exception {
      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));

      addMessage(handler, 0, 1, true);
      addMessage(handler, 1, 2, true);
      addMessage(handler, 2, 3, true);
      addMessage(handler, 3, 3, true);
      addMessage(handler, 4, 4, true);

      addMessage(handler, 10, 5, false);
      addMessage(handler, 9, 5, false);
      addMessage(handler, 8, 5, false);
      addMessage(handler, 7, 5, false);
      addMessage(handler, 6, 5, false);
      addMessage(handler, 5, 5, false);

      validateSequence(handler);

   }

   @Test
   public void testScheduleFixedSampleTailAndHead() throws Exception {
      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));

      // mix a sequence of tails / heads, but at the end this was supposed to be all sequential
      addMessage(handler, 1, 48L, true);
      addMessage(handler, 2, 48L, true);
      addMessage(handler, 3, 48L, true);
      addMessage(handler, 4, 48L, true);
      addMessage(handler, 5, 48L, true);
      addMessage(handler, 0, 48L, false);

      addMessage(handler, 13, 59L, true);
      addMessage(handler, 14, 59L, true);
      addMessage(handler, 15, 59L, true);
      addMessage(handler, 16, 59L, true);
      addMessage(handler, 17, 59L, true);
      addMessage(handler, 12, 59L, false);

      addMessage(handler, 7, 49L, true);
      addMessage(handler, 8, 49L, true);
      addMessage(handler, 9, 49L, true);
      addMessage(handler, 10, 49L, true);
      addMessage(handler, 11, 49L, true);
      addMessage(handler, 6, 49L, false);

      validateSequence(handler);
   }

   @Test
   public void testScheduleNow() throws Exception {

      ExecutorService executor = Executors.newFixedThreadPool(50, ActiveMQThreadFactory.defaultThreadFactory());
      ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, ActiveMQThreadFactory.defaultThreadFactory());
      try {
         for (int i = 0; i < 100; i++) {
            // it's better to run the test a few times instead of run millions of messages here
            internalSchedule(executor, scheduler);
         }
      } finally {
         scheduler.shutdownNow();
         executor.shutdownNow();
      }
   }

   private void internalSchedule(ExecutorService executor, ScheduledThreadPoolExecutor scheduler) throws Exception {
      final int NUMBER_OF_MESSAGES = 200;
      int NUMBER_OF_THREADS = 20;

      final FakeQueueForScheduleUnitTest fakeQueue = new FakeQueueForScheduleUnitTest(NUMBER_OF_MESSAGES * NUMBER_OF_THREADS);
      final ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(scheduler, fakeQueue);

      final long now = System.currentTimeMillis();

      final CountDownLatch latchDone = new CountDownLatch(NUMBER_OF_THREADS);

      final AtomicInteger error = new AtomicInteger(0);

      class ProducerThread implements Runnable {

         @Override
         public void run() {
            try {
               for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
                  checkAndSchedule(handler, i, now, false, fakeQueue);
               }
            } catch (Exception e) {
               e.printStackTrace();
               error.incrementAndGet();
            } finally {
               latchDone.countDown();
            }

         }
      }

      for (int i = 0; i < NUMBER_OF_THREADS; i++) {
         executor.execute(new ProducerThread());
      }

      assertTrue(latchDone.await(1, TimeUnit.MINUTES));

      assertEquals(0, error.get());

      if (!fakeQueue.waitCompletion(2, TimeUnit.SECONDS)) {
         fail("Couldn't complete queue.add, expected " + NUMBER_OF_MESSAGES + ", still missing " + fakeQueue.expectedElements.toString());
      }
   }

   private void validateSequence(ScheduledDeliveryHandlerImpl handler) throws Exception {
      long lastSequence = -1;
      for (MessageReference ref : handler.getScheduledReferences()) {
         assertEquals(lastSequence + 1, ref.getMessage().getMessageID());
         lastSequence = ref.getMessage().getMessageID();
      }
   }

   private void addMessage(ScheduledDeliveryHandlerImpl handler,
                           long nextMessageID,
                           long nextScheduledTime,
                           boolean tail) {
      MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), null);
      refImpl.setScheduledDeliveryTime(nextScheduledTime);
      handler.addInPlace(nextScheduledTime, refImpl, tail);
   }

   private void checkAndSchedule(ScheduledDeliveryHandlerImpl handler,
                                 long nextMessageID,
                                 long nextScheduledTime,
                                 boolean tail,
                                 Queue queue) {
      MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), queue);
      refImpl.setScheduledDeliveryTime(nextScheduledTime);
      handler.checkAndSchedule(refImpl, tail);
   }

   private void debugList(boolean fail,
                          ScheduledDeliveryHandlerImpl handler,
                          long numberOfExpectedMessages) throws Exception {
      List<MessageReference> refs = handler.getScheduledReferences();

      HashSet<Long> messages = new HashSet<>();

      long lastTime = -1;

      for (MessageReference ref : refs) {
         assertFalse(messages.contains(ref.getMessage().getMessageID()));
         messages.add(ref.getMessage().getMessageID());

         if (fail) {
            assertTrue(ref.getScheduledDeliveryTime() >= lastTime);
         } else {
            if (ref.getScheduledDeliveryTime() < lastTime) {
               log.debug("^^^fail at " + ref.getScheduledDeliveryTime());
            }
         }
         lastTime = ref.getScheduledDeliveryTime();
      }

      for (long i = 0; i < numberOfExpectedMessages; i++) {
         assertTrue(messages.contains(Long.valueOf(i)));
      }
   }

   class FakeMessage implements Message {

      @Override
      public SimpleString getReplyTo() {
         return null;
      }

      @Override
      public Message setReplyTo(SimpleString address) {
         return null;
      }

      @Override
      public Object removeAnnotation(SimpleString key) {
         return null;
      }

      @Override
      public Object getAnnotation(SimpleString key) {
         return null;
      }

      @Override
      public void persist(ActiveMQBuffer targetRecord) {

      }

      @Override
      public int getDurableCount() {
         return 0;
      }

      @Override
      public Long getScheduledDeliveryTime() {
         return null;
      }

      @Override
      public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {

      }

      @Override
      public Persister<Message> getPersister() {
         return null;
      }

      @Override
      public int getPersistSize() {
         return 0;
      }
      final long id;

      @Override
      public CoreMessage toCore() {
         return toCore(null);
      }

      @Override
      public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
         return null;
      }

      FakeMessage(final long id) {
         this.id = id;
      }

      @Override
      public FakeMessage setMessageID(long id) {
         return this;
      }

      @Override
      public long getMessageID() {
         return id;
      }

      @Override
      public int durableUp() {
         return 0;
      }

      @Override
      public int durableDown() {
         return 0;
      }

      @Override
      public Message copy(long newID) {
         return null;
      }

      @Override
      public Message copy() {
         return null;
      }

      @Override
      public int getMemoryEstimate() {
         return 0;
      }

      @Override
      public int getRefCount() {
         return 0;
      }

      @Override
      public byte[] getDuplicateIDBytes() {
         return new byte[0];
      }

      @Override
      public Object getDuplicateProperty() {
         return null;
      }

      @Override
      public void messageChanged() {

      }

      @Override
      public UUID getUserID() {
         return null;
      }

      @Override
      public String getAddress() {
         return null;
      }

      @Override
      public SimpleString getAddressSimpleString() {
         return null;
      }

      @Override
      public Message setBuffer(ByteBuf buffer) {
         return null;
      }

      @Override
      public ByteBuf getBuffer() {
         return null;
      }
      @Override
      public Message setAddress(String address) {
         return null;
      }

      @Override
      public Message setAddress(SimpleString address) {
         return null;
      }

      @Override
      public boolean isDurable() {
         return false;
      }

      @Override
      public FakeMessage setDurable(boolean durable) {
         return this;
      }

      @Override
      public long getExpiration() {
         return 0;
      }

      @Override
      public boolean isExpired() {
         return false;
      }

      @Override
      public FakeMessage setExpiration(long expiration) {
         return this;
      }

      @Override
      public long getTimestamp() {
         return 0;
      }

      @Override
      public FakeMessage setTimestamp(long timestamp) {
         return this;
      }

      @Override
      public byte getPriority() {
         return 0;
      }

      @Override
      public FakeMessage setPriority(byte priority) {
         return this;
      }

      @Override
      public int getEncodeSize() {
         return 0;
      }

      @Override
      public boolean isLargeMessage() {
         return false;
      }

      @Override
      public Message putBooleanProperty(SimpleString key, boolean value) {
         return null;
      }

      @Override
      public Message putBooleanProperty(String key, boolean value) {
         return null;
      }

      @Override
      public Message putByteProperty(SimpleString key, byte value) {
         return null;
      }

      @Override
      public Message putByteProperty(String key, byte value) {
         return null;
      }

      @Override
      public Message putBytesProperty(SimpleString key, byte[] value) {
         return null;
      }

      @Override
      public Message putBytesProperty(String key, byte[] value) {
         return null;
      }

      @Override
      public Message putShortProperty(SimpleString key, short value) {
         return null;
      }

      @Override
      public Message putShortProperty(String key, short value) {
         return null;
      }

      @Override
      public Message putCharProperty(SimpleString key, char value) {
         return null;
      }

      @Override
      public Message putCharProperty(String key, char value) {
         return null;
      }

      @Override
      public Message putIntProperty(SimpleString key, int value) {
         return null;
      }

      @Override
      public Message putIntProperty(String key, int value) {
         return null;
      }

      @Override
      public Message putLongProperty(SimpleString key, long value) {
         return null;
      }

      @Override
      public Message putLongProperty(String key, long value) {
         return null;
      }

      @Override
      public Message putFloatProperty(SimpleString key, float value) {
         return null;
      }

      @Override
      public Message putFloatProperty(String key, float value) {
         return null;
      }

      @Override
      public Message putDoubleProperty(SimpleString key, double value) {
         return null;
      }

      @Override
      public Message putDoubleProperty(String key, double value) {
         return null;
      }

      @Override
      public Message putStringProperty(SimpleString key, SimpleString value) {
         return null;
      }

      @Override
      public Message putStringProperty(SimpleString key, String value) {
         return null;
      }

      @Override
      public Message putStringProperty(String key, String value) {
         return null;
      }

      @Override
      public Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public Object removeProperty(SimpleString key) {
         return null;
      }

      @Override
      public Object removeProperty(String key) {
         return null;
      }

      @Override
      public boolean containsProperty(SimpleString key) {
         return false;
      }

      @Override
      public boolean containsProperty(String key) {
         return false;
      }

      @Override
      public Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public Long getLongProperty(String key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public Object getObjectProperty(SimpleString key) {
         return null;
      }

      @Override
      public Object getObjectProperty(String key) {
         return null;
      }

      @Override
      public Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public String getStringProperty(String key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
         return null;
      }

      @Override
      public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
         return new byte[0];
      }

      @Override
      public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException {
         return new byte[0];
      }

      @Override
      public Set<SimpleString> getPropertyNames() {
         return null;
      }

      @Override
      public Map<String, Object> toMap() {
         return null;
      }

      @Override
      public Map<String, Object> toPropertyMap() {
         return null;
      }

      @Override
      public Message setUserID(Object userID) {
         return null;
      }

      @Override
      public void receiveBuffer(ByteBuf buffer) {

      }

      @Override
      public int getUsage() {
         return 0;
      }

      @Override
      public int usageUp() {
         return 0;
      }

      @Override
      public int usageDown() {
         return 0;
      }

      @Override
      public int refUp() {
         return 0;
      }

      @Override
      public int refDown() {
         return 0;
      }

      @Override
      public void sendBuffer(ByteBuf buffer, int count) {

      }

      @Override
      public long getPersistentSize() throws ActiveMQException {
         return 0;
      }

      @Override
      public Object getOwner() {
         return null;
      }

      @Override
      public void setOwner(Object object) {
      }

   }

   public class FakeQueueForScheduleUnitTest extends CriticalComponentImpl implements Queue {

      @Override
      public void setPurgeOnNoConsumers(boolean value) {

      }

      @Override
      public boolean isEnabled() {
         return false;
      }

      @Override
      public void setEnabled(boolean value) {

      }

      @Override
      public PagingStore getPagingStore() {
         return null;
      }

      @Override
      public int durableUp(Message message) {
         return 1;
      }

      @Override
      public int durableDown(Message message) {
         return 1;
      }

      @Override
      public void refUp(MessageReference messageReference) {

      }

      @Override
      public MessageReference removeWithSuppliedID(long id, ToLongFunction<MessageReference> idSupplier) {
         return null;
      }

      @Override
      public void expireReferences(Runnable done) {

      }

      @Override
      public void refDown(MessageReference messageReference) {

      }

      @Override
      public void removeAddress() throws Exception {
      }

      @Override
      public long getAcknowledgeAttempts() {
         return 0;
      }

      @Override
      public boolean allowsReferenceCallback() {
         return false;
      }

      @Override
      public int getConsumersBeforeDispatch() {
         return 0;
      }

      @Override
      public void setConsumersBeforeDispatch(int consumersBeforeDispatch) {

      }

      @Override
      public long getDelayBeforeDispatch() {
         return 0;
      }

      @Override
      public void setDelayBeforeDispatch(long delayBeforeDispatch) {

      }

      @Override
      public long getDispatchStartTime() {
         return 0;
      }

      @Override
      public boolean isDispatching() {
         return false;
      }

      @Override
      public void setDispatching(boolean dispatching) {

      }

      @Override
      public void setMaxConsumer(int maxConsumers) {

      }

      @Override
      public int getGroupBuckets() {
         return 0;
      }

      @Override
      public void setGroupBuckets(int groupBuckets) {

      }

      @Override
      public boolean isGroupRebalance() {
         return false;
      }

      @Override
      public void setGroupRebalance(boolean groupRebalance) {

      }

      @Override
      public boolean isGroupRebalancePauseDispatch() {
         return false;
      }

      @Override
      public void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDisptach) {

      }

      @Override
      public SimpleString getGroupFirstKey() {
         return null;
      }

      @Override
      public void setGroupFirstKey(SimpleString groupFirstKey) {

      }

      @Override
      public boolean isConfigurationManaged() {
         return false;
      }

      @Override
      public void setConfigurationManaged(boolean configurationManaged) {

      }

      @Override
      public void recheckRefCount(OperationContext context) {
      }

      @Override
      public void unproposed(SimpleString groupID) {

      }

      public FakeQueueForScheduleUnitTest(final int expectedElements) {
         super(EmptyCriticalAnalyzer.getInstance(), 1);
         this.expectedElements = new CountDownLatch(expectedElements);
      }

      @Override
      public boolean isPersistedPause() {
         return false;
      }

      public boolean waitCompletion(long timeout, TimeUnit timeUnit) throws Exception {
         return expectedElements.await(timeout, timeUnit);
      }

      final CountDownLatch expectedElements;
      LinkedList<MessageReference> messages = new LinkedList<>();

      @Override
      public SimpleString getName() {
         return null;
      }

      @Override
      public Long getID() {
         return Long.valueOf(0L);
      }

      @Override
      public void pause(boolean persist) {
      }

      @Override
      public void reloadPause(long recordID) {
      }

      @Override
      public Filter getFilter() {
         return null;
      }

      @Override
      public void setFilter(Filter filter) {
      }

      @Override
      public PageSubscription getPageSubscription() {
         return null;
      }

      @Override
      public RoutingType getRoutingType() {
         return null;
      }

      @Override
      public void setRoutingType(RoutingType routingType) {

      }

      @Override
      public boolean isDurable() {
         return false;
      }

      @Override
      public boolean isDurableMessage() {
         return false;
      }

      @Override
      public boolean isAutoDelete() {
         // no-op
         return false;
      }

      @Override
      public long getAutoDeleteDelay() {
         // no-op
         return -1;
      }

      @Override
      public long getAutoDeleteMessageCount() {
         // no-op
         return -1;
      }

      @Override
      public boolean isTemporary() {
         return false;
      }

      @Override
      public boolean isAutoCreated() {
         return false;
      }

      @Override
      public boolean isPurgeOnNoConsumers() {
         return false;
      }

      @Override
      public int getMaxConsumers() {
         return -1;
      }

      @Override
      public void addConsumer(Consumer consumer) throws Exception {

      }

      @Override
      public void addLingerSession(String sessionId) {

      }

      @Override
      public void removeLingerSession(String sessionId) {

      }

      @Override
      public void removeConsumer(Consumer consumer) {

      }

      @Override
      public int retryMessages(Filter filter) throws Exception {
         return 0;
      }

      @Override
      public int getConsumerCount() {
         return 0;
      }

      @Override
      public long getConsumerRemovedTimestamp() {
         return 0;
      }

      @Override
      public void setRingSize(long ringSize) {

      }

      @Override
      public long getRingSize() {
         return 0;
      }

      @Override
      public void setConsumersRefCount(ReferenceCounter referenceCounter) {

      }

      @Override
      public ReferenceCounter getConsumersRefCount() {
         return null;
      }

      @Override
      public void addSorted(List<MessageReference> refs, boolean scheduling) {
         addHead(refs, scheduling);
      }

      @Override
      public void reload(MessageReference ref) {

      }

      @Override
      public void addTail(MessageReference ref) {

      }

      @Override
      public void addTail(MessageReference ref, boolean direct) {

      }

      @Override
      public void addHead(MessageReference ref, boolean scheduling) {

      }

      @Override
      public void addSorted(MessageReference ref, boolean scheduling) {

      }

      @Override
      public void addHead(List<MessageReference> refs, boolean scheduling) {
         for (MessageReference ref : refs) {
            addFirst(ref);
         }
      }

      private void addFirst(MessageReference ref) {
         expectedElements.countDown();
         this.messages.addFirst(ref);
      }

      @Override
      public void acknowledge(MessageReference ref) throws Exception {

      }

      @Override
      public void acknowledge(MessageReference ref, ServerConsumer consumer) throws Exception {

      }

      @Override
      public void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {

      }

      @Override
      public void acknowledge(Transaction tx, MessageReference ref) throws Exception {

      }

      @Override
      public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {

      }

      @Override
      public void reacknowledge(Transaction tx, MessageReference ref) throws Exception {

      }

      @Override
      public void cancel(Transaction tx, MessageReference ref) {

      }

      @Override
      public void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck) {

      }

      @Override
      public void cancel(MessageReference reference, long timeBase) throws Exception {

      }

      @Override
      public void deliverAsync() {

      }

      @Override
      public void forceDelivery() {

      }

      @Override
      public void deleteQueue() throws Exception {

      }

      @Override
      public void deleteQueue(boolean removeConsumers) throws Exception {

      }

      @Override
      public void destroyPaging() throws Exception {

      }

      @Override
      public long getMessageCount() {
         return 0;
      }

      @Override
      public long getPersistentSize() {
         return 0;
      }

      @Override
      public long getDurableMessageCount() {
         return 0;
      }

      @Override
      public long getDurablePersistentSize() {
         return 0;
      }

      @Override
      public int getDeliveringCount() {
         return 0;
      }

      @Override
      public long getDeliveringSize() {
         return 0;
      }

      @Override
      public int getDurableDeliveringCount() {
         return 0;
      }

      @Override
      public long getDurableDeliveringSize() {
         return 0;
      }

      @Override
      public int getDurableScheduledCount() {
         return 0;
      }

      @Override
      public long getDurableScheduledSize() {
         return 0;
      }

      @Override
      public void referenceHandled(MessageReference ref) {

      }

      @Override
      public int getScheduledCount() {
         return 0;
      }

      @Override
      public long getScheduledSize() {
         return 0;
      }

      @Override
      public List<MessageReference> getScheduledMessages() {
         return null;
      }

      @Override
      public Map<String, List<MessageReference>> getDeliveringMessages() {
         return null;
      }

      @Override
      public long getMessagesAdded() {
         return 0;
      }

      @Override
      public long getMessagesAcknowledged() {
         return 0;
      }

      @Override
      public long getMessagesExpired() {
         return 0;
      }

      @Override
      public long getMessagesKilled() {
         return 0;
      }

      @Override
      public long getMessagesReplaced() {
         return 0;
      }

      @Override
      public MessageReference removeReferenceWithID(long id) throws Exception {
         return null;
      }

      @Override
      public MessageReference getReference(long id) {
         return null;
      }

      @Override
      public int deleteAllReferences() throws Exception {
         return 0;
      }

      @Override
      public int deleteAllReferences(int flushLimit) throws Exception {
         return 0;
      }

      @Override
      public boolean deleteReference(long messageID) throws Exception {
         return false;
      }

      @Override
      public int deleteMatchingReferences(Filter filter) throws Exception {
         return 0;
      }

      @Override
      public int deleteMatchingReferences(int flushLImit, Filter filter, AckReason reason) throws Exception {
         return 0;
      }

      @Override
      public boolean expireReference(long messageID) throws Exception {
         return false;
      }

      @Override
      public int expireReferences(Filter filter) throws Exception {
         return 0;
      }

      @Override
      public void expireReferences() {

      }

      @Override
      public void expire(MessageReference ref) throws Exception {

      }

      @Override
      public void expire(MessageReference ref, ServerConsumer consumer) throws Exception {

      }

      @Override
      public boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
         return false;
      }

      @Override
      public boolean sendMessageToDeadLetterAddress(long messageID) throws Exception {
         return false;
      }

      @Override
      public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
         return 0;
      }

      @Override
      public boolean changeReferencePriority(long messageID, byte newPriority) throws Exception {
         return false;
      }

      @Override
      public int changeReferencesPriority(Filter filter, byte newPriority) throws Exception {
         return 0;
      }

      @Override
      public boolean moveReference(long messageID, SimpleString toAddress, Binding binding, boolean rejectDuplicates) throws Exception {
         return false;
      }

      @Override
      public int moveReferences(Filter filter, SimpleString toAddress, Binding binding) throws Exception {
         return 0;
      }

      @Override
      public int moveReferences(int flushLimit,
                                Filter filter,
                                SimpleString toAddress,
                                boolean rejectDuplicates,
                                Binding binding) throws Exception {
         return 0;
      }

      @Override
      public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress, boolean rejectDuplicates, int messageCount, Binding binding) throws Exception {
         return 0;
      }

      @Override
      public void addRedistributor(long delay) {

      }

      @Override
      public void cancelRedistributor() throws Exception {

      }

      @Override
      public boolean hasMatchingConsumer(Message message) {
         return false;
      }

      @Override
      public Collection<Consumer> getConsumers() {
         return null;
      }

      @Override
      public Map<SimpleString, Consumer> getGroups() {
         return null;
      }

      @Override
      public void resetGroup(SimpleString groupID) {

      }

      @Override
      public void resetAllGroups() {

      }

      @Override
      public int getGroupCount() {
         return 0;
      }

      @Override
      public Pair<Boolean, Boolean> checkRedelivery(MessageReference ref,
                                                    long timeBase,
                                                    boolean ignoreRedeliveryDelay) throws Exception {
         return new Pair<>(false, false);
      }

      @Override
      public LinkedListIterator<MessageReference> iterator() {
         return null;
      }

      @Override
      public LinkedListIterator<MessageReference> browserIterator() {
         return null;
      }

      @Override
      public SimpleString getExpiryAddress() {
         return null;
      }

      @Override
      public void pause() {

      }

      @Override
      public void resume() {

      }

      @Override
      public boolean isPaused() {
         return false;
      }

      @Override
      public Executor getExecutor() {
         return null;
      }

      @Override
      public void resetAllIterators() {

      }

      @Override
      public boolean flushExecutor() {
         return false;
      }

      @Override
      public void close() throws Exception {

      }

      @Override
      public boolean isDirectDeliver() {
         return false;
      }

      @Override
      public SimpleString getAddress() {
         return null;
      }

      @Override
      public boolean isInternalQueue() {
         return false;
      }

      @Override
      public void setInternalQueue(boolean internalQueue) {

      }

      @Override
      public void resetMessagesAdded() {

      }

      @Override
      public void resetMessagesAcknowledged() {

      }

      @Override
      public void resetMessagesExpired() {

      }

      @Override
      public void resetMessagesKilled() {

      }

      @Override
      public void incrementMesssagesAdded() {

      }

      @Override
      public void deliverScheduledMessages() {

      }

      @Override
      public void route(Message message, RoutingContext context) throws Exception {

      }

      @Override
      public void routeWithAck(Message message, RoutingContext context) {

      }

      @Override
      public void postAcknowledge(MessageReference ref, AckReason reason) {

      }

      @Override
      public SimpleString getUser() {
         return null;
      }

      @Override
      public void setUser(SimpleString user) {

      }

      @Override
      public boolean isLastValue() {
         return false;
      }

      @Override
      public SimpleString getLastValueKey() {
         return null;
      }

      @Override
      public boolean isNonDestructive() {
         return false;
      }

      @Override
      public void setNonDestructive(boolean nonDestructive) {

      }

      @Override
      public boolean isExclusive() {
         return false;
      }

      @Override
      public void setExclusive(boolean exclusive) {

      }

   }
}
