| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.broker.region; |
| |
| import java.io.IOException; |
| import javax.jms.ResourceAllocationException; |
| import org.apache.activemq.advisory.AdvisorySupport; |
| import org.apache.activemq.broker.Broker; |
| import org.apache.activemq.broker.BrokerService; |
| import org.apache.activemq.broker.ConnectionContext; |
| import org.apache.activemq.broker.ProducerBrokerExchange; |
| import org.apache.activemq.broker.region.policy.DeadLetterStrategy; |
| import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; |
| import org.apache.activemq.command.ActiveMQDestination; |
| import org.apache.activemq.command.ActiveMQTopic; |
| import org.apache.activemq.command.Message; |
| import org.apache.activemq.command.MessageDispatchNotification; |
| import org.apache.activemq.command.ProducerInfo; |
| import org.apache.activemq.state.ProducerState; |
| import org.apache.activemq.store.MessageStore; |
| import org.apache.activemq.usage.MemoryUsage; |
| import org.apache.activemq.usage.SystemUsage; |
| import org.apache.activemq.usage.Usage; |
| import org.slf4j.Logger; |
| |
| /** |
| * |
| */ |
| public abstract class BaseDestination implements Destination { |
| /** |
| * The maximum number of messages to page in to the destination from |
| * persistent storage |
| */ |
| public static final int MAX_PAGE_SIZE = 200; |
| public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2; |
| public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000; |
| public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000; |
| public static final int MAX_PRODUCERS_TO_AUDIT = 64; |
| public static final int MAX_AUDIT_DEPTH = 2048; |
| |
| protected final ActiveMQDestination destination; |
| protected final Broker broker; |
| protected final MessageStore store; |
| protected SystemUsage systemUsage; |
| protected MemoryUsage memoryUsage; |
| private boolean producerFlowControl = true; |
| protected boolean warnOnProducerFlowControl = true; |
| protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; |
| |
| private int maxProducersToAudit = 1024; |
| private int maxAuditDepth = 2048; |
| private boolean enableAudit = true; |
| private int maxPageSize = MAX_PAGE_SIZE; |
| private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE; |
| private boolean useCache = true; |
| private int minimumMessageSize = 1024; |
| private boolean lazyDispatch = false; |
| private boolean advisoryForSlowConsumers; |
| private boolean advisdoryForFastProducers; |
| private boolean advisoryForDiscardingMessages; |
| private boolean advisoryWhenFull; |
| private boolean advisoryForDelivery; |
| private boolean advisoryForConsumed; |
| private boolean sendAdvisoryIfNoConsumers; |
| protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); |
| protected final BrokerService brokerService; |
| protected final Broker regionBroker; |
| protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY; |
| protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD; |
| private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE; |
| protected int cursorMemoryHighWaterMark = 70; |
| protected int storeUsageHighWaterMark = 100; |
| private SlowConsumerStrategy slowConsumerStrategy; |
| private boolean prioritizedMessages; |
| private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; |
| private boolean gcIfInactive; |
| private long lastActiveTime=0l; |
| private boolean reduceMemoryFootprint = false; |
| |
| /** |
| * @param broker |
| * @param store |
| * @param destination |
| * @param parentStats |
| * @throws Exception |
| */ |
| public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception { |
| this.brokerService = brokerService; |
| this.broker = brokerService.getBroker(); |
| this.store = store; |
| this.destination = destination; |
| // let's copy the enabled property from the parent DestinationStatistics |
| this.destinationStatistics.setEnabled(parentStats.isEnabled()); |
| this.destinationStatistics.setParent(parentStats); |
| this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString()); |
| this.memoryUsage = this.systemUsage.getMemoryUsage(); |
| this.memoryUsage.setUsagePortion(1.0f); |
| this.regionBroker = brokerService.getRegionBroker(); |
| } |
| |
| /** |
| * initialize the destination |
| * |
| * @throws Exception |
| */ |
| public void initialize() throws Exception { |
| // Let the store know what usage manager we are using so that he can |
| // flush messages to disk when usage gets high. |
| if (store != null) { |
| store.setMemoryUsage(this.memoryUsage); |
| } |
| } |
| |
| /** |
| * @return the producerFlowControl |
| */ |
| public boolean isProducerFlowControl() { |
| return producerFlowControl; |
| } |
| |
| /** |
| * @param producerFlowControl the producerFlowControl to set |
| */ |
| public void setProducerFlowControl(boolean producerFlowControl) { |
| this.producerFlowControl = producerFlowControl; |
| } |
| |
| /** |
| * Set's the interval at which warnings about producers being blocked by |
| * resource usage will be triggered. Values of 0 or less will disable |
| * warnings |
| * |
| * @param blockedProducerWarningInterval the interval at which warning about |
| * blocked producers will be triggered. |
| */ |
| public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { |
| this.blockedProducerWarningInterval = blockedProducerWarningInterval; |
| } |
| |
| /** |
| * |
| * @return the interval at which warning about blocked producers will be |
| * triggered. |
| */ |
| public long getBlockedProducerWarningInterval() { |
| return blockedProducerWarningInterval; |
| } |
| |
| /** |
| * @return the maxProducersToAudit |
| */ |
| public int getMaxProducersToAudit() { |
| return maxProducersToAudit; |
| } |
| |
| /** |
| * @param maxProducersToAudit the maxProducersToAudit to set |
| */ |
| public void setMaxProducersToAudit(int maxProducersToAudit) { |
| this.maxProducersToAudit = maxProducersToAudit; |
| } |
| |
| /** |
| * @return the maxAuditDepth |
| */ |
| public int getMaxAuditDepth() { |
| return maxAuditDepth; |
| } |
| |
| /** |
| * @param maxAuditDepth the maxAuditDepth to set |
| */ |
| public void setMaxAuditDepth(int maxAuditDepth) { |
| this.maxAuditDepth = maxAuditDepth; |
| } |
| |
| /** |
| * @return the enableAudit |
| */ |
| public boolean isEnableAudit() { |
| return enableAudit; |
| } |
| |
| /** |
| * @param enableAudit the enableAudit to set |
| */ |
| public void setEnableAudit(boolean enableAudit) { |
| this.enableAudit = enableAudit; |
| } |
| |
| public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { |
| destinationStatistics.getProducers().increment(); |
| this.lastActiveTime=0l; |
| } |
| |
| public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { |
| destinationStatistics.getProducers().decrement(); |
| } |
| |
| public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{ |
| destinationStatistics.getConsumers().increment(); |
| this.lastActiveTime=0l; |
| } |
| |
| public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{ |
| destinationStatistics.getConsumers().decrement(); |
| } |
| |
| |
| public final MemoryUsage getMemoryUsage() { |
| return memoryUsage; |
| } |
| |
| public DestinationStatistics getDestinationStatistics() { |
| return destinationStatistics; |
| } |
| |
| public ActiveMQDestination getActiveMQDestination() { |
| return destination; |
| } |
| |
| public final String getName() { |
| return getActiveMQDestination().getPhysicalName(); |
| } |
| |
| public final MessageStore getMessageStore() { |
| return store; |
| } |
| |
| public final boolean isActive() { |
| return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0; |
| } |
| |
| public int getMaxPageSize() { |
| return maxPageSize; |
| } |
| |
| public void setMaxPageSize(int maxPageSize) { |
| this.maxPageSize = maxPageSize; |
| } |
| |
| public int getMaxBrowsePageSize() { |
| return this.maxBrowsePageSize; |
| } |
| |
| public void setMaxBrowsePageSize(int maxPageSize) { |
| this.maxBrowsePageSize = maxPageSize; |
| } |
| |
| public int getMaxExpirePageSize() { |
| return this.maxExpirePageSize; |
| } |
| |
| public void setMaxExpirePageSize(int maxPageSize) { |
| this.maxExpirePageSize = maxPageSize; |
| } |
| |
| public void setExpireMessagesPeriod(long expireMessagesPeriod) { |
| this.expireMessagesPeriod = expireMessagesPeriod; |
| } |
| |
| public long getExpireMessagesPeriod() { |
| return expireMessagesPeriod; |
| } |
| |
| public boolean isUseCache() { |
| return useCache; |
| } |
| |
| public void setUseCache(boolean useCache) { |
| this.useCache = useCache; |
| } |
| |
| public int getMinimumMessageSize() { |
| return minimumMessageSize; |
| } |
| |
| public void setMinimumMessageSize(int minimumMessageSize) { |
| this.minimumMessageSize = minimumMessageSize; |
| } |
| |
| public boolean isLazyDispatch() { |
| return lazyDispatch; |
| } |
| |
| public void setLazyDispatch(boolean lazyDispatch) { |
| this.lazyDispatch = lazyDispatch; |
| } |
| |
| protected long getDestinationSequenceId() { |
| return regionBroker.getBrokerSequenceId(); |
| } |
| |
| /** |
| * @return the advisoryForSlowConsumers |
| */ |
| public boolean isAdvisoryForSlowConsumers() { |
| return advisoryForSlowConsumers; |
| } |
| |
| /** |
| * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set |
| */ |
| public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) { |
| this.advisoryForSlowConsumers = advisoryForSlowConsumers; |
| } |
| |
| /** |
| * @return the advisoryForDiscardingMessages |
| */ |
| public boolean isAdvisoryForDiscardingMessages() { |
| return advisoryForDiscardingMessages; |
| } |
| |
| /** |
| * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to |
| * set |
| */ |
| public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) { |
| this.advisoryForDiscardingMessages = advisoryForDiscardingMessages; |
| } |
| |
| /** |
| * @return the advisoryWhenFull |
| */ |
| public boolean isAdvisoryWhenFull() { |
| return advisoryWhenFull; |
| } |
| |
| /** |
| * @param advisoryWhenFull the advisoryWhenFull to set |
| */ |
| public void setAdvisoryWhenFull(boolean advisoryWhenFull) { |
| this.advisoryWhenFull = advisoryWhenFull; |
| } |
| |
| /** |
| * @return the advisoryForDelivery |
| */ |
| public boolean isAdvisoryForDelivery() { |
| return advisoryForDelivery; |
| } |
| |
| /** |
| * @param advisoryForDelivery the advisoryForDelivery to set |
| */ |
| public void setAdvisoryForDelivery(boolean advisoryForDelivery) { |
| this.advisoryForDelivery = advisoryForDelivery; |
| } |
| |
| /** |
| * @return the advisoryForConsumed |
| */ |
| public boolean isAdvisoryForConsumed() { |
| return advisoryForConsumed; |
| } |
| |
| /** |
| * @param advisoryForConsumed the advisoryForConsumed to set |
| */ |
| public void setAdvisoryForConsumed(boolean advisoryForConsumed) { |
| this.advisoryForConsumed = advisoryForConsumed; |
| } |
| |
| /** |
| * @return the advisdoryForFastProducers |
| */ |
| public boolean isAdvisdoryForFastProducers() { |
| return advisdoryForFastProducers; |
| } |
| |
| /** |
| * @param advisdoryForFastProducers the advisdoryForFastProducers to set |
| */ |
| public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) { |
| this.advisdoryForFastProducers = advisdoryForFastProducers; |
| } |
| |
| public boolean isSendAdvisoryIfNoConsumers() { |
| return sendAdvisoryIfNoConsumers; |
| } |
| |
| public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { |
| this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; |
| } |
| |
| /** |
| * @return the dead letter strategy |
| */ |
| public DeadLetterStrategy getDeadLetterStrategy() { |
| return deadLetterStrategy; |
| } |
| |
| /** |
| * set the dead letter strategy |
| * |
| * @param deadLetterStrategy |
| */ |
| public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { |
| this.deadLetterStrategy = deadLetterStrategy; |
| } |
| |
| public int getCursorMemoryHighWaterMark() { |
| return this.cursorMemoryHighWaterMark; |
| } |
| |
| public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { |
| this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; |
| } |
| |
| /** |
| * called when message is consumed |
| * |
| * @param context |
| * @param messageReference |
| */ |
| public void messageConsumed(ConnectionContext context, MessageReference messageReference) { |
| if (advisoryForConsumed) { |
| broker.messageConsumed(context, messageReference); |
| } |
| } |
| |
| /** |
| * Called when message is delivered to the broker |
| * |
| * @param context |
| * @param messageReference |
| */ |
| public void messageDelivered(ConnectionContext context, MessageReference messageReference) { |
| if (advisoryForDelivery) { |
| broker.messageDelivered(context, messageReference); |
| } |
| } |
| |
| /** |
| * Called when a message is discarded - e.g. running low on memory This will |
| * happen only if the policy is enabled - e.g. non durable topics |
| * |
| * @param context |
| * @param messageReference |
| */ |
| public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { |
| if (advisoryForDiscardingMessages) { |
| broker.messageDiscarded(context, sub, messageReference); |
| } |
| } |
| |
| /** |
| * Called when there is a slow consumer |
| * |
| * @param context |
| * @param subs |
| */ |
| public void slowConsumer(ConnectionContext context, Subscription subs) { |
| if (advisoryForSlowConsumers) { |
| broker.slowConsumer(context, this, subs); |
| } |
| if (slowConsumerStrategy != null) { |
| slowConsumerStrategy.slowConsumer(context, subs); |
| } |
| } |
| |
| /** |
| * Called to notify a producer is too fast |
| * |
| * @param context |
| * @param producerInfo |
| */ |
| public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { |
| if (advisdoryForFastProducers) { |
| broker.fastProducer(context, producerInfo); |
| } |
| } |
| |
| /** |
| * Called when a Usage reaches a limit |
| * |
| * @param context |
| * @param usage |
| */ |
| public void isFull(ConnectionContext context, Usage usage) { |
| if (advisoryWhenFull) { |
| broker.isFull(context, this, usage); |
| } |
| } |
| |
| public void dispose(ConnectionContext context) throws IOException { |
| if (this.store != null) { |
| this.store.removeAllMessages(context); |
| this.store.dispose(context); |
| } |
| this.destinationStatistics.setParent(null); |
| this.memoryUsage.stop(); |
| } |
| |
| /** |
| * Provides a hook to allow messages with no consumer to be processed in |
| * some way - such as to send to a dead letter queue or something.. |
| */ |
| protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception { |
| if (!msg.isPersistent()) { |
| if (isSendAdvisoryIfNoConsumers()) { |
| // allow messages with no consumers to be dispatched to a dead |
| // letter queue |
| if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) { |
| |
| Message message = msg.copy(); |
| // The original destination and transaction id do not get |
| // filled when the message is first sent, |
| // it is only populated if the message is routed to another |
| // destination like the DLQ |
| if (message.getOriginalDestination() != null) { |
| message.setOriginalDestination(message.getDestination()); |
| } |
| if (message.getOriginalTransactionId() != null) { |
| message.setOriginalTransactionId(message.getTransactionId()); |
| } |
| |
| ActiveMQTopic advisoryTopic; |
| if (destination.isQueue()) { |
| advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination); |
| } else { |
| advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); |
| } |
| message.setDestination(advisoryTopic); |
| message.setTransactionId(null); |
| |
| // Disable flow control for this since since we don't want |
| // to block. |
| boolean originalFlowControl = context.isProducerFlowControl(); |
| try { |
| context.setProducerFlowControl(false); |
| ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); |
| producerExchange.setMutable(false); |
| producerExchange.setConnectionContext(context); |
| producerExchange.setProducerState(new ProducerState(new ProducerInfo())); |
| context.getBroker().send(producerExchange, message); |
| } finally { |
| context.setProducerFlowControl(originalFlowControl); |
| } |
| |
| } |
| } |
| } |
| } |
| |
| public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { |
| } |
| |
| public final int getStoreUsageHighWaterMark() { |
| return this.storeUsageHighWaterMark; |
| } |
| |
| public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { |
| this.storeUsageHighWaterMark = storeUsageHighWaterMark; |
| } |
| |
| protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException { |
| waitForSpace(context, usage, 100, warning); |
| } |
| |
| protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException { |
| if (systemUsage.isSendFailIfNoSpace()) { |
| getLog().debug("sendFailIfNoSpace, forcing exception on send: " + warning); |
| throw new ResourceAllocationException(warning); |
| } |
| if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { |
| if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) { |
| getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send: " + warning); |
| throw new ResourceAllocationException(warning); |
| } |
| } else { |
| long start = System.currentTimeMillis(); |
| long nextWarn = start; |
| while (!usage.waitForSpace(1000, highWaterMark)) { |
| if (context.getStopping().get()) { |
| throw new IOException("Connection closed, send aborted."); |
| } |
| |
| long now = System.currentTimeMillis(); |
| if (now >= nextWarn) { |
| getLog().info(warning + " (blocking for: " + (now - start) / 1000 + "s)"); |
| nextWarn = now + blockedProducerWarningInterval; |
| } |
| } |
| } |
| } |
| |
| protected abstract Logger getLog(); |
| |
| public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { |
| this.slowConsumerStrategy = slowConsumerStrategy; |
| } |
| |
| public SlowConsumerStrategy getSlowConsumerStrategy() { |
| return this.slowConsumerStrategy; |
| } |
| |
| |
| public boolean isPrioritizedMessages() { |
| return this.prioritizedMessages; |
| } |
| |
| public void setPrioritizedMessages(boolean prioritizedMessages) { |
| this.prioritizedMessages = prioritizedMessages; |
| if (store != null) { |
| store.setPrioritizedMessages(prioritizedMessages); |
| } |
| } |
| |
| /** |
| * @return the inactiveTimoutBeforeGC |
| */ |
| public long getInactiveTimoutBeforeGC() { |
| return this.inactiveTimoutBeforeGC; |
| } |
| |
| /** |
| * @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set |
| */ |
| public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) { |
| this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC; |
| } |
| |
| /** |
| * @return the gcIfInactive |
| */ |
| public boolean isGcIfInactive() { |
| return this.gcIfInactive; |
| } |
| |
| /** |
| * @param gcIfInactive the gcIfInactive to set |
| */ |
| public void setGcIfInactive(boolean gcIfInactive) { |
| this.gcIfInactive = gcIfInactive; |
| } |
| |
| public void markForGC(long timeStamp) { |
| if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false |
| && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) { |
| this.lastActiveTime = timeStamp; |
| } |
| } |
| |
| public boolean canGC() { |
| boolean result = false; |
| if (isGcIfInactive()&& this.lastActiveTime != 0l) { |
| if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimoutBeforeGC()) { |
| result = true; |
| } |
| } |
| return result; |
| } |
| |
| public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) { |
| this.reduceMemoryFootprint = reduceMemoryFootprint; |
| } |
| |
| protected boolean isReduceMemoryFootprint() { |
| return this.reduceMemoryFootprint; |
| } |
| } |