blob: 2c12cc3db0ede4e3a1204bd63102f6fde585c18d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging.client;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import flex.messaging.Destination;
import flex.messaging.MessageClient;
import flex.messaging.MessageDestination;
import flex.messaging.config.ConfigMap;
import flex.messaging.messages.Message;
import flex.messaging.services.messaging.ThrottleManager;
import flex.messaging.services.messaging.ThrottleManager.ThrottleResult;
import flex.messaging.services.messaging.ThrottleManager.ThrottleResult.Result;
/**
* The base FlexClientOutboundQueueProcessor implementation used if a custom implementation is not
* specified. Its behavior is very simple. It adds all new messages in order to the tail
* of the outbound queue and flushes all queued messages to the network as quickly as possible.
* It also handles the outbound client-level throttling specified at the destination level.
*/
public class FlexClientOutboundQueueProcessor {
//--------------------------------------------------------------------------
//
// Variables
//
//--------------------------------------------------------------------------
/**
* The associated FlexClient.
*/
private FlexClient client;
/**
* The last MessageClient messages were flushed to. This is mainly for faster
* lookup.
*/
private MessageClient lastMessageClient;
/**
* The associated endpoint's Id.
*/
private String endpointId;
/**
* Manages throttling of outbound client level messages.
*/
protected OutboundQueueThrottleManager outboundQueueThrottleManager;
//--------------------------------------------------------------------------
//
// Public Methods
//
//--------------------------------------------------------------------------
/**
* Stores the Id for the outbound queue's endpoint.
*
* @param value The Id for the outbound queue's endpoint.
*/
public void setEndpointId(String value) {
endpointId = value;
}
/**
* Returns the Id for the outbound queue's endpoint.
*
* @return The Id for the outbound queue's endpoint.
*/
public String getEndpointId() {
return endpointId;
}
/**
* Sets the associated FlexClient.
*
* @param value The associated FlexClient.
*/
public void setFlexClient(FlexClient value) {
client = value;
}
/**
* Returns the associated FlexClient.
*
* @return The associated FlexClient.
*/
public FlexClient getFlexClient() {
return client;
}
/**
* Returns the outbound queue throttle manager, or null if one does not exist.
*
* @return The outbound queue throttle manager.
*/
public OutboundQueueThrottleManager getOutboundQueueThrottleManager() {
return outboundQueueThrottleManager;
}
/**
* Utility method to initialize (if necessary) and return an outbound queue
* throttle manager.
*
* @return The outbound queue throttle manager.
*/
public OutboundQueueThrottleManager getOrCreateOutboundQueueThrottleManager() {
if (outboundQueueThrottleManager == null)
outboundQueueThrottleManager = new OutboundQueueThrottleManager(this);
return outboundQueueThrottleManager;
}
/**
* No-op; this default implementation doesn't require custom initialization.
* Subclasses may override to process any custom initialization properties that have been
* defined in the server configuration.
*
* @param properties A ConfigMap containing any custom initialization properties.
*/
public void initialize(ConfigMap properties) {
}
/**
* Always adds a new message to the tail of the queue.
*
* @param outboundQueue The queue of outbound messages.
* @param message The new message to add to the queue.
*/
public void add(List<Message> outboundQueue, Message message) {
outboundQueue.add(message);
}
/**
* Always empties the queue and returns all messages to be sent to the client.
*
* @param outboundQueue The queue of outbound messages.
* @return A FlushResult containing the messages that have been removed from the outbound queue
* to be written to the network and a wait time for the next flush of the outbound queue
* that is the default for the underlying Channel/Endpoint.
*/
public FlushResult flush(List<Message> outboundQueue) {
return flush(null /* no client distinction */, outboundQueue);
}
/**
* Removes all messages in the queue targeted to this specific MessageClient subscription(s) and
* returns them to be sent to the client.
* Overrides should be careful to only return messages for the specified MessageClient.
*
* @param messageClient The specific MessageClient to return messages for.
* @param outboundQueue The queue of outbound messages.
* @return A FlushResult containing the messages that have been removed from the outbound queue
* to be written to the network for this MessageClient.
*/
public FlushResult flush(MessageClient messageClient, List<Message> outboundQueue) {
FlushResult flushResult = new FlushResult();
List<Message> messagesToFlush = null;
for (Iterator<Message> iter = outboundQueue.iterator(); iter.hasNext(); ) {
Message message = iter.next();
if (messageClient == null || (message.getClientId().equals(messageClient.getClientId()))) {
if (isMessageExpired(message)) // Don't flush expired messages.
{
iter.remove();
continue;
}
// If no message client was explicitly provided, get the message client from
// the current message.
MessageClient messageClientForCurrentMessage = messageClient == null ?
getMessageClient(message) : messageClient;
// First, apply the destination level outbound throttling.
ThrottleResult throttleResult =
throttleOutgoingDestinationLevel(messageClientForCurrentMessage, message, false);
Result result = throttleResult.getResult();
// No destination level throttling; check destination-client level throttling.
if (Result.OK == result) {
throttleResult = throttleOutgoingClientLevel(messageClientForCurrentMessage, message, false);
result = throttleResult.getResult();
// If no throttling, simply add the message to the list.
if (Result.OK == result) {
updateMessageFrequencyOutgoing(messageClientForCurrentMessage, message);
if (messagesToFlush == null)
messagesToFlush = new ArrayList<Message>();
messagesToFlush.add(message);
}
// In rest of the policies (which is NONE), simply don't
// add the message to the list.
}
iter.remove();
}
}
flushResult.setMessages(messagesToFlush);
return flushResult;
}
/**
* Utility method to test whether a message has expired or not.
* Messages with a timeToLive value that is shorter than the timespan from the message's
* timestamp up to the current system time will cause this method to return true.
* If there are expired messages in the outbound queue, flush implementations
* should use this helper method to only process and return messages that have
* not yet expired.
*
* @param message The message to test for expiration.
* @return true if the message has a timeToLive value that has expired; otherwise false.
*/
public boolean isMessageExpired(Message message) {
return (message.getTimeToLive() > 0 &&
(System.currentTimeMillis() - message.getTimestamp()) >= message.getTimeToLive());
}
/**
* Attempts to throttle the outgoing message at the destination level.
*
* @param msgClient The client the message is intended for.
* @param message The message to consider to throttle.
* @param buffered Whether the message has already been buffered. In that case,
* parts of regular throttling code is skipped.
* @return The result of throttling attempt.
*/
protected ThrottleResult throttleOutgoingDestinationLevel(
MessageClient msgClient, Message message, boolean buffered) {
ThrottleManager throttleManager = getThrottleManager(msgClient);
if (throttleManager != null) {
// In already buffered messages, don't use ThrottleManager#throttleOutgoingMessage
// to avoid regular throttling handling as the message has already been buffered.
if (buffered)
return throttleManager.throttleDestinationLevel(message, false /*incoming*/);
// Otherwise, regular throttling.
return throttleManager.throttleOutgoingMessage(message);
}
return new ThrottleResult(); // Otherwise, return OK result.
}
/**
* Attempts to throttle the outgoing message at the destination-client level.
*
* @param msgClient The client the message is intended for.
* @param message The message to consider to throttle.
* @param buffered Whether the message has already been buffered. In that case,
* parts of regular throttling code is skipped.
* @return The result of throttling attempt.
*/
protected ThrottleResult throttleOutgoingClientLevel(MessageClient msgClient, Message message, boolean buffered) {
if (outboundQueueThrottleManager != null) // Means client level throttling enabled.
{
ThrottleResult throttleResult = outboundQueueThrottleManager.throttleOutgoingClientLevel(message);
if (!buffered) {
ThrottleManager throttleManager = getThrottleManager(msgClient);
if (throttleManager != null)
throttleManager.handleOutgoingThrottleResult(message, throttleResult, true /*isClientLevel*/);
}
return throttleResult;
}
return new ThrottleResult(); // Otherwise, return OK result.
}
/**
* Returns the message client that the message is intended to.
*
* @param message The message.
* @return The message client that the message is intended to.
*/
protected MessageClient getMessageClient(Message message) {
// First try using the cached message client.
if (lastMessageClient != null && message.getClientId().equals(lastMessageClient.getClientId())) {
return lastMessageClient;
} else // Go ahead with the lookup.
{
lastMessageClient = client.getMessageClient((String) message.getClientId());
return lastMessageClient;
}
}
/**
* Returns the throttle manager associated with the destination the message
* is intended to.
*
* @param msgClient The message client; it can be null.
* @return The throttle manager.
*/
protected ThrottleManager getThrottleManager(MessageClient msgClient) {
Destination destination = msgClient != null ? msgClient.getDestination() : null;
return (destination != null && destination instanceof MessageDestination) ?
((MessageDestination) destination).getThrottleManager() : null;
}
/**
* Updates the outgoing message's message frequency.
*
* @param msgClient The MessageClient that might have been passed to the flush; it can be null.
* @param message The message.
*/
protected void updateMessageFrequencyOutgoing(MessageClient msgClient, Message message) {
// Update the destination level message frequency.
ThrottleManager throttleManager = getThrottleManager(msgClient);
if (throttleManager != null)
throttleManager.updateMessageFrequencyDestinationLevel(false /*incoming*/);
// Update the client level message frequency.
if (outboundQueueThrottleManager != null)
outboundQueueThrottleManager.updateMessageFrequencyOutgoingClientLevel(message);
}
}