blob: 068a442bccaf25fe7f5bc214334077f0b0fd66b5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
public class ScheduledDeliveryHandlerTest extends Assert {
private static final Logger log = Logger.getLogger(ScheduledDeliveryHandlerTest.class);
@Test
public void testScheduleRandom() throws Exception {
ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
long nextMessage = 0;
long NUMBER_OF_SEQUENCES = 100000;
for (int i = 0; i < NUMBER_OF_SEQUENCES; i++) {
int numberOfMessages = RandomUtil.randomInt() % 10;
if (numberOfMessages == 0)
numberOfMessages = 1;
long nextScheduledTime = RandomUtil.randomPositiveLong();
for (int j = 0; j < numberOfMessages; j++) {
boolean tail = RandomUtil.randomBoolean();
addMessage(handler, nextMessage++, nextScheduledTime, tail);
}
}
debugList(true, handler, nextMessage);
}
@Test
public void testScheduleSameTimeHeadAndTail() throws Exception {
ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
long time = System.currentTimeMillis() + 10000;
for (int i = 10001; i < 20000; i++) {
addMessage(handler, i, time, true);
}
addMessage(handler, 10000, time, false);
time = System.currentTimeMillis() + 5000;
for (int i = 1; i < 10000; i++) {
addMessage(handler, i, time, true);
}
addMessage(handler, 0, time, false);
debugList(true, handler, 20000);
validateSequence(handler);
}
@Test
public void testScheduleFixedSample() throws Exception {
ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
addMessage(handler, 0, 48L, true);
addMessage(handler, 1, 75L, true);
addMessage(handler, 2, 56L, true);
addMessage(handler, 3, 7L, false);
addMessage(handler, 4, 69L, true);
debugList(true, handler, 5);
}
@Test
public void testScheduleWithAddHeads() throws Exception {
ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
addMessage(handler, 0, 1, true);
addMessage(handler, 1, 2, true);
addMessage(handler, 2, 3, true);
addMessage(handler, 3, 3, true);
addMessage(handler, 4, 4, true);
addMessage(handler, 10, 5, false);
addMessage(handler, 9, 5, false);
addMessage(handler, 8, 5, false);
addMessage(handler, 7, 5, false);
addMessage(handler, 6, 5, false);
addMessage(handler, 5, 5, false);
validateSequence(handler);
}
@Test
public void testScheduleFixedSampleTailAndHead() throws Exception {
ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
// mix a sequence of tails / heads, but at the end this was supposed to be all sequential
addMessage(handler, 1, 48L, true);
addMessage(handler, 2, 48L, true);
addMessage(handler, 3, 48L, true);
addMessage(handler, 4, 48L, true);
addMessage(handler, 5, 48L, true);
addMessage(handler, 0, 48L, false);
addMessage(handler, 13, 59L, true);
addMessage(handler, 14, 59L, true);
addMessage(handler, 15, 59L, true);
addMessage(handler, 16, 59L, true);
addMessage(handler, 17, 59L, true);
addMessage(handler, 12, 59L, false);
addMessage(handler, 7, 49L, true);
addMessage(handler, 8, 49L, true);
addMessage(handler, 9, 49L, true);
addMessage(handler, 10, 49L, true);
addMessage(handler, 11, 49L, true);
addMessage(handler, 6, 49L, false);
validateSequence(handler);
}
@Test
public void testScheduleNow() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(50, ActiveMQThreadFactory.defaultThreadFactory());
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, ActiveMQThreadFactory.defaultThreadFactory());
try {
for (int i = 0; i < 100; i++) {
// it's better to run the test a few times instead of run millions of messages here
internalSchedule(executor, scheduler);
}
} finally {
scheduler.shutdownNow();
executor.shutdownNow();
}
}
private void internalSchedule(ExecutorService executor, ScheduledThreadPoolExecutor scheduler) throws Exception {
final int NUMBER_OF_MESSAGES = 200;
int NUMBER_OF_THREADS = 20;
final FakeQueueForScheduleUnitTest fakeQueue = new FakeQueueForScheduleUnitTest(NUMBER_OF_MESSAGES * NUMBER_OF_THREADS);
final ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(scheduler, fakeQueue);
final long now = System.currentTimeMillis();
final CountDownLatch latchDone = new CountDownLatch(NUMBER_OF_THREADS);
final AtomicInteger error = new AtomicInteger(0);
class ProducerThread implements Runnable {
@Override
public void run() {
try {
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
checkAndSchedule(handler, i, now, false, fakeQueue);
}
} catch (Exception e) {
e.printStackTrace();
error.incrementAndGet();
} finally {
latchDone.countDown();
}
}
}
for (int i = 0; i < NUMBER_OF_THREADS; i++) {
executor.execute(new ProducerThread());
}
assertTrue(latchDone.await(1, TimeUnit.MINUTES));
assertEquals(0, error.get());
if (!fakeQueue.waitCompletion(2, TimeUnit.SECONDS)) {
fail("Couldn't complete queue.add, expected " + NUMBER_OF_MESSAGES + ", still missing " + fakeQueue.expectedElements.toString());
}
}
private void validateSequence(ScheduledDeliveryHandlerImpl handler) throws Exception {
long lastSequence = -1;
for (MessageReference ref : handler.getScheduledReferences()) {
assertEquals(lastSequence + 1, ref.getMessage().getMessageID());
lastSequence = ref.getMessage().getMessageID();
}
}
private void addMessage(ScheduledDeliveryHandlerImpl handler,
long nextMessageID,
long nextScheduledTime,
boolean tail) {
MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), null);
refImpl.setScheduledDeliveryTime(nextScheduledTime);
handler.addInPlace(nextScheduledTime, refImpl, tail);
}
private void checkAndSchedule(ScheduledDeliveryHandlerImpl handler,
long nextMessageID,
long nextScheduledTime,
boolean tail,
Queue queue) {
MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), queue);
refImpl.setScheduledDeliveryTime(nextScheduledTime);
handler.checkAndSchedule(refImpl, tail);
}
private void debugList(boolean fail,
ScheduledDeliveryHandlerImpl handler,
long numberOfExpectedMessages) throws Exception {
List<MessageReference> refs = handler.getScheduledReferences();
HashSet<Long> messages = new HashSet<>();
long lastTime = -1;
for (MessageReference ref : refs) {
assertFalse(messages.contains(ref.getMessage().getMessageID()));
messages.add(ref.getMessage().getMessageID());
if (fail) {
assertTrue(ref.getScheduledDeliveryTime() >= lastTime);
} else {
if (ref.getScheduledDeliveryTime() < lastTime) {
log.debug("^^^fail at " + ref.getScheduledDeliveryTime());
}
}
lastTime = ref.getScheduledDeliveryTime();
}
for (long i = 0; i < numberOfExpectedMessages; i++) {
assertTrue(messages.contains(Long.valueOf(i)));
}
}
class FakeMessage implements Message {
@Override
public SimpleString getReplyTo() {
return null;
}
@Override
public Message setReplyTo(SimpleString address) {
return null;
}
@Override
public Object removeAnnotation(SimpleString key) {
return null;
}
@Override
public Object getAnnotation(SimpleString key) {
return null;
}
@Override
public void persist(ActiveMQBuffer targetRecord) {
}
@Override
public int getDurableCount() {
return 0;
}
@Override
public Long getScheduledDeliveryTime() {
return null;
}
@Override
public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
}
@Override
public Persister<Message> getPersister() {
return null;
}
@Override
public int getPersistSize() {
return 0;
}
final long id;
@Override
public CoreMessage toCore() {
return toCore(null);
}
@Override
public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
return null;
}
FakeMessage(final long id) {
this.id = id;
}
@Override
public FakeMessage setMessageID(long id) {
return this;
}
@Override
public long getMessageID() {
return id;
}
@Override
public int durableUp() {
return 0;
}
@Override
public int durableDown() {
return 0;
}
@Override
public Message copy(long newID) {
return null;
}
@Override
public Message copy() {
return null;
}
@Override
public int getMemoryEstimate() {
return 0;
}
@Override
public int getRefCount() {
return 0;
}
@Override
public byte[] getDuplicateIDBytes() {
return new byte[0];
}
@Override
public Object getDuplicateProperty() {
return null;
}
@Override
public void messageChanged() {
}
@Override
public UUID getUserID() {
return null;
}
@Override
public String getAddress() {
return null;
}
@Override
public SimpleString getAddressSimpleString() {
return null;
}
@Override
public Message setBuffer(ByteBuf buffer) {
return null;
}
@Override
public ByteBuf getBuffer() {
return null;
}
@Override
public Message setAddress(String address) {
return null;
}
@Override
public Message setAddress(SimpleString address) {
return null;
}
@Override
public boolean isDurable() {
return false;
}
@Override
public FakeMessage setDurable(boolean durable) {
return this;
}
@Override
public long getExpiration() {
return 0;
}
@Override
public boolean isExpired() {
return false;
}
@Override
public FakeMessage setExpiration(long expiration) {
return this;
}
@Override
public long getTimestamp() {
return 0;
}
@Override
public FakeMessage setTimestamp(long timestamp) {
return this;
}
@Override
public byte getPriority() {
return 0;
}
@Override
public FakeMessage setPriority(byte priority) {
return this;
}
@Override
public int getEncodeSize() {
return 0;
}
@Override
public boolean isLargeMessage() {
return false;
}
@Override
public Message putBooleanProperty(SimpleString key, boolean value) {
return null;
}
@Override
public Message putBooleanProperty(String key, boolean value) {
return null;
}
@Override
public Message putByteProperty(SimpleString key, byte value) {
return null;
}
@Override
public Message putByteProperty(String key, byte value) {
return null;
}
@Override
public Message putBytesProperty(SimpleString key, byte[] value) {
return null;
}
@Override
public Message putBytesProperty(String key, byte[] value) {
return null;
}
@Override
public Message putShortProperty(SimpleString key, short value) {
return null;
}
@Override
public Message putShortProperty(String key, short value) {
return null;
}
@Override
public Message putCharProperty(SimpleString key, char value) {
return null;
}
@Override
public Message putCharProperty(String key, char value) {
return null;
}
@Override
public Message putIntProperty(SimpleString key, int value) {
return null;
}
@Override
public Message putIntProperty(String key, int value) {
return null;
}
@Override
public Message putLongProperty(SimpleString key, long value) {
return null;
}
@Override
public Message putLongProperty(String key, long value) {
return null;
}
@Override
public Message putFloatProperty(SimpleString key, float value) {
return null;
}
@Override
public Message putFloatProperty(String key, float value) {
return null;
}
@Override
public Message putDoubleProperty(SimpleString key, double value) {
return null;
}
@Override
public Message putDoubleProperty(String key, double value) {
return null;
}
@Override
public Message putStringProperty(SimpleString key, SimpleString value) {
return null;
}
@Override
public Message putStringProperty(SimpleString key, String value) {
return null;
}
@Override
public Message putStringProperty(String key, String value) {
return null;
}
@Override
public Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public Object removeProperty(SimpleString key) {
return null;
}
@Override
public Object removeProperty(String key) {
return null;
}
@Override
public boolean containsProperty(SimpleString key) {
return false;
}
@Override
public boolean containsProperty(String key) {
return false;
}
@Override
public Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public Long getLongProperty(String key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public Object getObjectProperty(SimpleString key) {
return null;
}
@Override
public Object getObjectProperty(String key) {
return null;
}
@Override
public Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public String getStringProperty(String key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
return null;
}
@Override
public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
return new byte[0];
}
@Override
public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException {
return new byte[0];
}
@Override
public Set<SimpleString> getPropertyNames() {
return null;
}
@Override
public Map<String, Object> toMap() {
return null;
}
@Override
public Map<String, Object> toPropertyMap() {
return null;
}
@Override
public Message setUserID(Object userID) {
return null;
}
@Override
public void receiveBuffer(ByteBuf buffer) {
}
@Override
public int getUsage() {
return 0;
}
@Override
public int usageUp() {
return 0;
}
@Override
public int usageDown() {
return 0;
}
@Override
public int refUp() {
return 0;
}
@Override
public int refDown() {
return 0;
}
@Override
public void sendBuffer(ByteBuf buffer, int count) {
}
@Override
public long getPersistentSize() throws ActiveMQException {
return 0;
}
@Override
public Object getOwner() {
return null;
}
@Override
public void setOwner(Object object) {
}
}
public class FakeQueueForScheduleUnitTest extends CriticalComponentImpl implements Queue {
@Override
public void setPurgeOnNoConsumers(boolean value) {
}
@Override
public boolean isEnabled() {
return false;
}
@Override
public void setEnabled(boolean value) {
}
@Override
public PagingStore getPagingStore() {
return null;
}
@Override
public int durableUp(Message message) {
return 1;
}
@Override
public int durableDown(Message message) {
return 1;
}
@Override
public void refUp(MessageReference messageReference) {
}
@Override
public MessageReference removeWithSuppliedID(String serverID, long id, NodeStore<MessageReference> nodeStore) {
return null;
}
@Override
public void expireReferences(Runnable done) {
}
@Override
public void refDown(MessageReference messageReference) {
}
@Override
public void removeAddress() throws Exception {
}
@Override
public long getAcknowledgeAttempts() {
return 0;
}
@Override
public boolean allowsReferenceCallback() {
return false;
}
@Override
public int getConsumersBeforeDispatch() {
return 0;
}
@Override
public void setConsumersBeforeDispatch(int consumersBeforeDispatch) {
}
@Override
public long getDelayBeforeDispatch() {
return 0;
}
@Override
public void setDelayBeforeDispatch(long delayBeforeDispatch) {
}
@Override
public long getDispatchStartTime() {
return 0;
}
@Override
public boolean isDispatching() {
return false;
}
@Override
public void setDispatching(boolean dispatching) {
}
@Override
public void setMaxConsumer(int maxConsumers) {
}
@Override
public int getGroupBuckets() {
return 0;
}
@Override
public void setGroupBuckets(int groupBuckets) {
}
@Override
public boolean isGroupRebalance() {
return false;
}
@Override
public void setGroupRebalance(boolean groupRebalance) {
}
@Override
public boolean isGroupRebalancePauseDispatch() {
return false;
}
@Override
public void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDisptach) {
}
@Override
public SimpleString getGroupFirstKey() {
return null;
}
@Override
public void setGroupFirstKey(SimpleString groupFirstKey) {
}
@Override
public boolean isConfigurationManaged() {
return false;
}
@Override
public void setConfigurationManaged(boolean configurationManaged) {
}
@Override
public void recheckRefCount(OperationContext context) {
}
@Override
public void unproposed(SimpleString groupID) {
}
public FakeQueueForScheduleUnitTest(final int expectedElements) {
super(EmptyCriticalAnalyzer.getInstance(), 1);
this.expectedElements = new CountDownLatch(expectedElements);
}
@Override
public boolean isPersistedPause() {
return false;
}
public boolean waitCompletion(long timeout, TimeUnit timeUnit) throws Exception {
return expectedElements.await(timeout, timeUnit);
}
final CountDownLatch expectedElements;
LinkedList<MessageReference> messages = new LinkedList<>();
@Override
public SimpleString getName() {
return null;
}
@Override
public Long getID() {
return Long.valueOf(0L);
}
@Override
public void pause(boolean persist) {
}
@Override
public void reloadPause(long recordID) {
}
@Override
public Filter getFilter() {
return null;
}
@Override
public void setFilter(Filter filter) {
}
@Override
public PageSubscription getPageSubscription() {
return null;
}
@Override
public RoutingType getRoutingType() {
return null;
}
@Override
public void setRoutingType(RoutingType routingType) {
}
@Override
public boolean isDurable() {
return false;
}
@Override
public boolean isDurableMessage() {
return false;
}
@Override
public boolean isAutoDelete() {
// no-op
return false;
}
@Override
public long getAutoDeleteDelay() {
// no-op
return -1;
}
@Override
public long getAutoDeleteMessageCount() {
// no-op
return -1;
}
@Override
public boolean isTemporary() {
return false;
}
@Override
public boolean isAutoCreated() {
return false;
}
@Override
public boolean isPurgeOnNoConsumers() {
return false;
}
@Override
public int getMaxConsumers() {
return -1;
}
@Override
public void addConsumer(Consumer consumer) throws Exception {
}
@Override
public void addLingerSession(String sessionId) {
}
@Override
public void removeLingerSession(String sessionId) {
}
@Override
public void removeConsumer(Consumer consumer) {
}
@Override
public int retryMessages(Filter filter) throws Exception {
return 0;
}
@Override
public int getConsumerCount() {
return 0;
}
@Override
public long getConsumerRemovedTimestamp() {
return 0;
}
@Override
public void setRingSize(long ringSize) {
}
@Override
public long getRingSize() {
return 0;
}
@Override
public void setConsumersRefCount(ReferenceCounter referenceCounter) {
}
@Override
public ReferenceCounter getConsumersRefCount() {
return null;
}
@Override
public void addSorted(List<MessageReference> refs, boolean scheduling) {
addHead(refs, scheduling);
}
@Override
public void reload(MessageReference ref) {
}
@Override
public void addTail(MessageReference ref) {
}
@Override
public void addTail(MessageReference ref, boolean direct) {
}
@Override
public void addHead(MessageReference ref, boolean scheduling) {
}
@Override
public void addSorted(MessageReference ref, boolean scheduling) {
}
@Override
public void addHead(List<MessageReference> refs, boolean scheduling) {
for (MessageReference ref : refs) {
addFirst(ref);
}
}
private void addFirst(MessageReference ref) {
expectedElements.countDown();
this.messages.addFirst(ref);
}
@Override
public void acknowledge(MessageReference ref) throws Exception {
}
@Override
public void acknowledge(MessageReference ref, ServerConsumer consumer) throws Exception {
}
@Override
public void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
}
@Override
public void acknowledge(Transaction tx, MessageReference ref) throws Exception {
}
@Override
public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
}
@Override
public void reacknowledge(Transaction tx, MessageReference ref) throws Exception {
}
@Override
public void cancel(Transaction tx, MessageReference ref) {
}
@Override
public void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck) {
}
@Override
public void cancel(MessageReference reference, long timeBase) throws Exception {
}
@Override
public void deliverAsync() {
}
@Override
public void forceDelivery() {
}
@Override
public void deleteQueue() throws Exception {
}
@Override
public void deleteQueue(boolean removeConsumers) throws Exception {
}
@Override
public void destroyPaging() throws Exception {
}
@Override
public long getMessageCount() {
return 0;
}
@Override
public long getPersistentSize() {
return 0;
}
@Override
public long getDurableMessageCount() {
return 0;
}
@Override
public long getDurablePersistentSize() {
return 0;
}
@Override
public int getDeliveringCount() {
return 0;
}
@Override
public long getDeliveringSize() {
return 0;
}
@Override
public int getDurableDeliveringCount() {
return 0;
}
@Override
public long getDurableDeliveringSize() {
return 0;
}
@Override
public int getDurableScheduledCount() {
return 0;
}
@Override
public long getDurableScheduledSize() {
return 0;
}
@Override
public void referenceHandled(MessageReference ref) {
}
@Override
public int getScheduledCount() {
return 0;
}
@Override
public long getScheduledSize() {
return 0;
}
@Override
public List<MessageReference> getScheduledMessages() {
return null;
}
@Override
public Map<String, List<MessageReference>> getDeliveringMessages() {
return null;
}
@Override
public long getMessagesAdded() {
return 0;
}
@Override
public long getMessagesAcknowledged() {
return 0;
}
@Override
public long getMessagesExpired() {
return 0;
}
@Override
public long getMessagesKilled() {
return 0;
}
@Override
public long getMessagesReplaced() {
return 0;
}
@Override
public MessageReference removeReferenceWithID(long id) throws Exception {
return null;
}
@Override
public MessageReference getReference(long id) {
return null;
}
@Override
public int deleteAllReferences() throws Exception {
return 0;
}
@Override
public int deleteAllReferences(int flushLimit) throws Exception {
return 0;
}
@Override
public boolean deleteReference(long messageID) throws Exception {
return false;
}
@Override
public int deleteMatchingReferences(Filter filter) throws Exception {
return 0;
}
@Override
public int deleteMatchingReferences(int flushLImit, Filter filter, AckReason reason) throws Exception {
return 0;
}
@Override
public boolean expireReference(long messageID) throws Exception {
return false;
}
@Override
public int expireReferences(Filter filter) throws Exception {
return 0;
}
@Override
public void expireReferences() {
}
@Override
public void expire(MessageReference ref) throws Exception {
}
@Override
public void expire(MessageReference ref, ServerConsumer consumer) throws Exception {
}
@Override
public boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
return false;
}
@Override
public boolean sendMessageToDeadLetterAddress(long messageID) throws Exception {
return false;
}
@Override
public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
return 0;
}
@Override
public boolean changeReferencePriority(long messageID, byte newPriority) throws Exception {
return false;
}
@Override
public int changeReferencesPriority(Filter filter, byte newPriority) throws Exception {
return 0;
}
@Override
public boolean moveReference(long messageID, SimpleString toAddress, Binding binding, boolean rejectDuplicates) throws Exception {
return false;
}
@Override
public int moveReferences(Filter filter, SimpleString toAddress, Binding binding) throws Exception {
return 0;
}
@Override
public int moveReferences(int flushLimit,
Filter filter,
SimpleString toAddress,
boolean rejectDuplicates,
Binding binding) throws Exception {
return 0;
}
@Override
public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress, boolean rejectDuplicates, int messageCount, Binding binding) throws Exception {
return 0;
}
@Override
public void addRedistributor(long delay) {
}
@Override
public void cancelRedistributor() throws Exception {
}
@Override
public boolean hasMatchingConsumer(Message message) {
return false;
}
@Override
public Collection<Consumer> getConsumers() {
return null;
}
@Override
public Map<SimpleString, Consumer> getGroups() {
return null;
}
@Override
public void resetGroup(SimpleString groupID) {
}
@Override
public void resetAllGroups() {
}
@Override
public int getGroupCount() {
return 0;
}
@Override
public Pair<Boolean, Boolean> checkRedelivery(MessageReference ref,
long timeBase,
boolean ignoreRedeliveryDelay) throws Exception {
return new Pair<>(false, false);
}
@Override
public LinkedListIterator<MessageReference> iterator() {
return null;
}
@Override
public LinkedListIterator<MessageReference> browserIterator() {
return null;
}
@Override
public SimpleString getExpiryAddress() {
return null;
}
@Override
public void pause() {
}
@Override
public void resume() {
}
@Override
public boolean isPaused() {
return false;
}
@Override
public Executor getExecutor() {
return null;
}
@Override
public void resetAllIterators() {
}
@Override
public boolean flushExecutor() {
return false;
}
@Override
public void close() throws Exception {
}
@Override
public boolean isDirectDeliver() {
return false;
}
@Override
public SimpleString getAddress() {
return null;
}
@Override
public boolean isInternalQueue() {
return false;
}
@Override
public void setInternalQueue(boolean internalQueue) {
}
@Override
public void resetMessagesAdded() {
}
@Override
public void resetMessagesAcknowledged() {
}
@Override
public void resetMessagesExpired() {
}
@Override
public void resetMessagesKilled() {
}
@Override
public void incrementMesssagesAdded() {
}
@Override
public void deliverScheduledMessages() {
}
@Override
public void route(Message message, RoutingContext context) throws Exception {
}
@Override
public void routeWithAck(Message message, RoutingContext context) {
}
@Override
public void postAcknowledge(MessageReference ref, AckReason reason) {
}
@Override
public SimpleString getUser() {
return null;
}
@Override
public void setUser(SimpleString user) {
}
@Override
public boolean isLastValue() {
return false;
}
@Override
public SimpleString getLastValueKey() {
return null;
}
@Override
public boolean isNonDestructive() {
return false;
}
@Override
public void setNonDestructive(boolean nonDestructive) {
}
@Override
public boolean isExclusive() {
return false;
}
@Override
public void setExclusive(boolean exclusive) {
}
}
}