blob: b5bdde941461ab052a34db74eec1cb97d53bedbe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.command;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.state.CommandVisitor;
/**
* @openwire:marshaller code="5"
*
*/
public class ConsumerInfo extends BaseCommand {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_INFO;
public static final byte HIGH_PRIORITY = 10;
public static final byte NORMAL_PRIORITY = 0;
public static final byte NETWORK_CONSUMER_PRIORITY = -5;
public static final byte LOW_PRIORITY = -10;
protected ConsumerId consumerId;
protected ActiveMQDestination destination;
protected int prefetchSize;
protected int maximumPendingMessageLimit;
protected boolean browser;
protected boolean dispatchAsync;
protected String selector;
protected String subscriptionName;
protected boolean noLocal;
protected boolean exclusive;
protected boolean retroactive;
protected byte priority;
protected BrokerId[] brokerPath;
protected boolean optimizedAcknowledge;
// used by the broker
protected transient int currentPrefetchSize;
// if true, the consumer will not send range
protected boolean noRangeAcks;
// acks.
protected BooleanExpression additionalPredicate;
protected transient boolean networkSubscription; // this subscription
protected transient List<ConsumerId> networkConsumerIds; // the original consumerId
// not marshalled, populated from RemoveInfo, the last message delivered, used
// to suppress redelivery on prefetched messages after close
private transient long lastDeliveredSequenceId;
// originated from a
// network connection
public ConsumerInfo() {
}
public ConsumerInfo(ConsumerId consumerId) {
this.consumerId = consumerId;
}
public ConsumerInfo(SessionInfo sessionInfo, long consumerId) {
this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId);
}
public ConsumerInfo copy() {
ConsumerInfo info = new ConsumerInfo();
copy(info);
return info;
}
public void copy(ConsumerInfo info) {
super.copy(info);
info.consumerId = consumerId;
info.destination = destination;
info.prefetchSize = prefetchSize;
info.maximumPendingMessageLimit = maximumPendingMessageLimit;
info.browser = browser;
info.dispatchAsync = dispatchAsync;
info.selector = selector;
info.subscriptionName = subscriptionName;
info.noLocal = noLocal;
info.exclusive = exclusive;
info.retroactive = retroactive;
info.priority = priority;
info.brokerPath = brokerPath;
info.networkSubscription = networkSubscription;
if (networkConsumerIds != null) {
if (info.networkConsumerIds==null){
info.networkConsumerIds=new ArrayList<ConsumerId>();
}
info.networkConsumerIds.addAll(networkConsumerIds);
}
}
public boolean isDurable() {
return subscriptionName != null;
}
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
/**
* Is used to uniquely identify the consumer to the broker.
*
* @openwire:property version=1 cache=true
*/
public ConsumerId getConsumerId() {
return consumerId;
}
public void setConsumerId(ConsumerId consumerId) {
this.consumerId = consumerId;
}
/**
* Is this consumer a queue browser?
*
* @openwire:property version=1
*/
public boolean isBrowser() {
return browser;
}
public void setBrowser(boolean browser) {
this.browser = browser;
}
/**
* The destination that the consumer is interested in receiving messages
* from. This destination could be a composite destination.
*
* @openwire:property version=1 cache=true
*/
public ActiveMQDestination getDestination() {
return destination;
}
public void setDestination(ActiveMQDestination destination) {
this.destination = destination;
}
/**
* How many messages a broker will send to the client without receiving an
* ack before he stops dispatching messages to the client.
*
* @openwire:property version=1
*/
public int getPrefetchSize() {
return prefetchSize;
}
public void setPrefetchSize(int prefetchSize) {
this.prefetchSize = prefetchSize;
this.currentPrefetchSize = prefetchSize;
}
/**
* How many messages a broker will keep around, above the prefetch limit,
* for non-durable topics before starting to discard older messages.
*
* @openwire:property version=1
*/
public int getMaximumPendingMessageLimit() {
return maximumPendingMessageLimit;
}
public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
this.maximumPendingMessageLimit = maximumPendingMessageLimit;
}
/**
* Should the broker dispatch a message to the consumer async? If he does it
* async, then he uses a more SEDA style of processing while if it is not
* done async, then he broker use a STP style of processing. STP is more
* appropriate in high bandwidth situations or when being used by and in vm
* transport.
*
* @openwire:property version=1
*/
public boolean isDispatchAsync() {
return dispatchAsync;
}
public void setDispatchAsync(boolean dispatchAsync) {
this.dispatchAsync = dispatchAsync;
}
/**
* The JMS selector used to filter out messages that this consumer is
* interested in.
*
* @openwire:property version=1
*/
public String getSelector() {
return selector;
}
public void setSelector(String selector) {
this.selector = selector;
}
/**
* Used to identify the name of a durable subscription.
*
* @openwire:property version=1
*/
public String getSubscriptionName() {
return subscriptionName;
}
public void setSubscriptionName(String durableSubscriptionId) {
this.subscriptionName = durableSubscriptionId;
}
/**
* @deprecated
* @return
* @see getSubscriptionName
*/
public String getSubcriptionName() {
return subscriptionName;
}
/**
* @deprecated
* @see setSubscriptionName
* @param durableSubscriptionId
*/
public void setSubcriptionName(String durableSubscriptionId) {
this.subscriptionName = durableSubscriptionId;
}
/**
* Set noLocal to true to avoid receiving messages that were published
* locally on the same connection.
*
* @openwire:property version=1
*/
public boolean isNoLocal() {
return noLocal;
}
public void setNoLocal(boolean noLocal) {
this.noLocal = noLocal;
}
/**
* An exclusive consumer locks out other consumers from being able to
* receive messages from the destination. If there are multiple exclusive
* consumers for a destination, the first one created will be the exclusive
* consumer of the destination.
*
* @openwire:property version=1
*/
public boolean isExclusive() {
return exclusive;
}
public void setExclusive(boolean exclusive) {
this.exclusive = exclusive;
}
/**
* A retroactive consumer only has meaning for Topics. It allows a consumer
* to retroactively see messages sent prior to the consumer being created.
* If the consumer is not durable, it will be delivered the last message
* published to the topic. If the consumer is durable then it will receive
* all persistent messages that are still stored in persistent storage for
* that topic.
*
* @openwire:property version=1
*/
public boolean isRetroactive() {
return retroactive;
}
public void setRetroactive(boolean retroactive) {
this.retroactive = retroactive;
}
public RemoveInfo createRemoveCommand() {
RemoveInfo command = new RemoveInfo(getConsumerId());
command.setResponseRequired(isResponseRequired());
return command;
}
/**
* The broker will avoid dispatching to a lower priority consumer if there
* are other higher priority consumers available to dispatch to. This allows
* letting the broker to have an affinity to higher priority consumers.
* Default priority is 0.
*
* @openwire:property version=1
*/
public byte getPriority() {
return priority;
}
public void setPriority(byte priority) {
this.priority = priority;
}
/**
* The route of brokers the command has moved through.
*
* @openwire:property version=1 cache=true
*/
public BrokerId[] getBrokerPath() {
return brokerPath;
}
public void setBrokerPath(BrokerId[] brokerPath) {
this.brokerPath = brokerPath;
}
/**
* A transient additional predicate that can be used it inject additional
* predicates into the selector on the fly. Handy if if say a Security
* Broker interceptor wants to filter out messages based on security level
* of the consumer.
*
* @openwire:property version=1
*/
public BooleanExpression getAdditionalPredicate() {
return additionalPredicate;
}
public void setAdditionalPredicate(BooleanExpression additionalPredicate) {
this.additionalPredicate = additionalPredicate;
}
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processAddConsumer(this);
}
/**
* @openwire:property version=1
* @return Returns the networkSubscription.
*/
public boolean isNetworkSubscription() {
return networkSubscription;
}
/**
* @param networkSubscription The networkSubscription to set.
*/
public void setNetworkSubscription(boolean networkSubscription) {
this.networkSubscription = networkSubscription;
}
/**
* @openwire:property version=1
* @return Returns the optimizedAcknowledge.
*/
public boolean isOptimizedAcknowledge() {
return optimizedAcknowledge;
}
/**
* @param optimizedAcknowledge The optimizedAcknowledge to set.
*/
public void setOptimizedAcknowledge(boolean optimizedAcknowledge) {
this.optimizedAcknowledge = optimizedAcknowledge;
}
/**
* @return Returns the currentPrefetchSize.
*/
public int getCurrentPrefetchSize() {
return currentPrefetchSize;
}
/**
* @param currentPrefetchSize The currentPrefetchSize to set.
*/
public void setCurrentPrefetchSize(int currentPrefetchSize) {
this.currentPrefetchSize = currentPrefetchSize;
}
/**
* The broker may be able to optimize it's processing or provides better QOS
* if it knows the consumer will not be sending ranged acks.
*
* @return true if the consumer will not send range acks.
* @openwire:property version=1
*/
public boolean isNoRangeAcks() {
return noRangeAcks;
}
public void setNoRangeAcks(boolean noRangeAcks) {
this.noRangeAcks = noRangeAcks;
}
public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) {
if (networkConsumerIds == null) {
networkConsumerIds = new ArrayList<ConsumerId>();
}
networkConsumerIds.add(networkConsumerId);
}
public synchronized void removeNetworkConsumerId(ConsumerId networkConsumerId) {
if (networkConsumerIds != null) {
networkConsumerIds.remove(networkConsumerId);
if (networkConsumerIds.isEmpty()) {
networkConsumerIds=null;
}
}
}
public synchronized boolean isNetworkConsumersEmpty() {
return networkConsumerIds == null || networkConsumerIds.isEmpty();
}
public synchronized List<ConsumerId> getNetworkConsumerIds(){
List<ConsumerId> result = new ArrayList<ConsumerId>();
if (networkConsumerIds != null) {
result.addAll(networkConsumerIds);
}
return result;
}
/**
* Tracks the original subscription id that causes a subscription to
* percolate through a network when networkTTL > 1. Tracking the original
* subscription allows duplicate suppression.
*
* @return array of the current subscription path
* @openwire:property version=4
*/
public ConsumerId[] getNetworkConsumerPath() {
ConsumerId[] result = null;
if (networkConsumerIds != null) {
result = networkConsumerIds.toArray(new ConsumerId[0]);
}
return result;
}
public void setNetworkConsumerPath(ConsumerId[] consumerPath) {
if (consumerPath != null) {
for (int i=0; i<consumerPath.length; i++) {
addNetworkConsumerId(consumerPath[i]);
}
}
}
public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) {
this.lastDeliveredSequenceId = lastDeliveredSequenceId;
}
public long getLastDeliveredSequenceId() {
return lastDeliveredSequenceId;
}
}