blob: 6191a2cdd43f4c9c073ee1ff16d1f4d95c4f135e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging.client;
import java.util.concurrent.ConcurrentHashMap;
import flex.messaging.MessageClient.SubscriptionInfo;
import flex.messaging.config.ThrottleSettings.Policy;
import flex.messaging.log.Log;
import flex.messaging.messages.Message;
import flex.messaging.services.messaging.MessageFrequency;
import flex.messaging.services.messaging.ThrottleManager;
import flex.messaging.services.messaging.ThrottleManager.ThrottleResult;
import flex.messaging.util.StringUtils;
/**
* Used to keep track of and limit outbound message rates of a single FlexClient queue.
* An outbound FlexClient queue can contain messages from multiple MessageClients
* across multiple destinations. It can also contain messages for multiple
* subscriptions (for each subtopic/selector) across the same destination for
* the same MessageClient.
*/
public class OutboundQueueThrottleManager {
//--------------------------------------------------------------------------
//
// Constructor
//
//--------------------------------------------------------------------------
/**
* Constructs a default outbound queue throttle manager.
*
* @param processor The outbound queue processor that is using this throttle manager.
*/
public OutboundQueueThrottleManager(FlexClientOutboundQueueProcessor processor) {
destinationFrequencies = new ConcurrentHashMap<String, DestinationFrequency>();
this.processor = processor;
}
//--------------------------------------------------------------------------
//
// Variables
//
//--------------------------------------------------------------------------
/**
* Map of destination id and destination message frequencies.
*/
protected final ConcurrentHashMap<String, DestinationFrequency> destinationFrequencies;
/**
* The parent queue processor of the throttle manager.
*/
protected final FlexClientOutboundQueueProcessor processor;
//--------------------------------------------------------------------------
//
// Public Methods
//
//--------------------------------------------------------------------------
/**
* Registers the destination with the outbound throttle manager.
*
* @param destinationId The id of the destination.
* @param outboundMaxClientFrequency The outbound max-client-frequency specified
* at the destination.
* @param outboundPolicy The outbound throttle policy specified at the destination.
*/
public void registerDestination(String destinationId, int outboundMaxClientFrequency, Policy outboundPolicy) {
DestinationFrequency frequency = destinationFrequencies.get(destinationId);
if (frequency == null) {
frequency = new DestinationFrequency(outboundMaxClientFrequency, outboundPolicy);
destinationFrequencies.putIfAbsent(destinationId, frequency);
}
}
/**
* Registers the subscription of a client talking to a destination with the
* specified subscription info.
*
* @param destinationId The destination id.
* @param si The subscription information.
*/
public void registerSubscription(String destinationId, SubscriptionInfo si) {
DestinationFrequency frequency = destinationFrequencies.get(destinationId);
frequency.logMaxFrequencyDuringRegistration(frequency.outboundMaxClientFrequency, si);
}
/**
* Unregisters the subscription.
*
* @param destinationId The destination id.
* @param si The subscription information.
*/
public void unregisterSubscription(String destinationId, SubscriptionInfo si) {
unregisterDestination(destinationId);
}
/**
* Unregisters all subscriptions of the client under the specified destination.
*
* @param destinationId The destination id.
*/
public void unregisterAllSubscriptions(String destinationId) {
unregisterDestination(destinationId);
}
/**
* Attempts to throttle the outgoing message.
*
* @param message The message to consider to throttle.
* @return True if the message was throttled; otherwise false.
*/
public ThrottleResult throttleOutgoingClientLevel(Message message) {
String destinationId = message.getDestination();
if (isDestinationRegistered(destinationId)) {
DestinationFrequency frequency = destinationFrequencies.get(message.getDestination());
int maxFrequency = frequency.getMaxFrequency(message); // Limit to check against.
MessageFrequency messageFrequency = frequency.getMessageFrequency(message); // Message rate of the client.
if (messageFrequency != null) {
ThrottleResult result = messageFrequency.checkLimit(maxFrequency, frequency.outboundPolicy);
return result;
}
}
return new ThrottleResult(); // Otherwise, return OK result.
}
/**
* Updates the outgoing client level message frequency of the message.
*
* @param message The message.
*/
public void updateMessageFrequencyOutgoingClientLevel(Message message) {
String destinationId = message.getDestination();
if (isDestinationRegistered(destinationId)) {
DestinationFrequency frequency = destinationFrequencies.get(message.getDestination());
MessageFrequency messageFrequency = frequency.getMessageFrequency(message);
if (messageFrequency != null)
messageFrequency.updateMessageFrequency();
}
}
//--------------------------------------------------------------------------
//
// Protected Methods
//
//--------------------------------------------------------------------------
/**
* Determines whether the destination has been registered or not.
*
* @param destinationId The destination id.
* @return True if the destination with the specified id has been registered.
*/
protected boolean isDestinationRegistered(String destinationId) {
return destinationFrequencies.containsKey(destinationId);
}
/**
* Unregisters the destination from the outbound throttle manager.
*
* @param destinationId The id of the destination.
*/
protected void unregisterDestination(String destinationId) {
if (isDestinationRegistered(destinationId))
destinationFrequencies.remove(destinationId);
}
//--------------------------------------------------------------------------
//
// Inner Classes
//
//--------------------------------------------------------------------------
/**
* Used to keep track of max-client-frequency and outgoing throttle policy
* specified at the destination. It also keeps track of outbound message
* rates of all MessageClient subscriptions across the destination.
*/
class DestinationFrequency {
protected final int outboundMaxClientFrequency; // destination specified client limit.
protected final MessageFrequency outboundClientFrequency;
protected final Policy outboundPolicy; // destination specified policy.
/**
* Default constructor.
*
* @param outboundMaxClientFrequency The outbound throttling max-client-frequency of the destination.
* @param outboundPolicy The outbound throttling policy of the destination.
*/
DestinationFrequency(int outboundMaxClientFrequency, Policy outboundPolicy) {
this.outboundMaxClientFrequency = outboundMaxClientFrequency;
this.outboundPolicy = outboundPolicy;
outboundClientFrequency = new MessageFrequency(outboundMaxClientFrequency);
}
/**
* Returns the max-client-frequency for the subscription the message is
* intended for (which is simply the max-client-frequency specified at
* the destination).
*
* @param message The message.
* @return The max-frequency for the subscription.
*/
int getMaxFrequency(Message message) {
return outboundMaxClientFrequency;
}
/**
* Given a subscription the message is intended to, returns the message
* rate frequency for that subscription.
*
* @param message The message.
* @return The message frequency for the subscription, if it exists; otherwise null.
*/
MessageFrequency getMessageFrequency(Message message) {
return outboundClientFrequency;
}
/**
* Utility function to log the maxFrequency being used during subscription.
*
* @param maxFrequency The maxFrequency to log.
*/
void logMaxFrequencyDuringRegistration(int maxFrequency, SubscriptionInfo si) {
if (Log.isDebug())
Log.getLogger(ThrottleManager.LOG_CATEGORY).debug("Outbound queue throttle manager for FlexClient '"
+ processor.getFlexClient().getId() + "' is using '" + maxFrequency
+ "' as the throttling limit for its subscription: "
+ StringUtils.NEWLINE + si);
}
}
}