blob: e2a4a918bdb6496c3a1545bf5b137f95d8e221e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging;
import flex.management.runtime.messaging.MessageDestinationControl;
import flex.management.runtime.messaging.services.messaging.SubscriptionManagerControl;
import flex.management.runtime.messaging.services.messaging.ThrottleManagerControl;
import flex.messaging.config.ConfigurationConstants;
import flex.messaging.config.ConfigurationException;
import flex.messaging.config.DestinationSettings;
import flex.messaging.config.ThrottleSettings;
import flex.messaging.config.ConfigMap;
import flex.messaging.config.NetworkSettings;
import flex.messaging.config.ServerSettings;
import flex.messaging.config.ThrottleSettings.Policy;
import flex.messaging.log.LogCategories;
import flex.messaging.services.MessageService;
import flex.messaging.services.Service;
import flex.messaging.services.messaging.SubscriptionManager;
import flex.messaging.services.messaging.RemoteSubscriptionManager;
import flex.messaging.services.messaging.ThrottleManager;
import flex.messaging.services.messaging.MessagingConstants;
import flex.messaging.util.ClassUtil;
/**
* A logical reference to a MessageDestination.
*/
public class MessageDestination extends FactoryDestination {
static final long serialVersionUID = -2016911808141319012L;
/**
* Log category for <code>MessageDestination</code>.
*/
public static final String LOG_CATEGORY = LogCategories.SERVICE_MESSAGE;
// Errors
private static final int UNSUPPORTED_POLICY = 10124;
// Destination properties
private transient ServerSettings serverSettings;
// Destination internal
private transient SubscriptionManager subscriptionManager;
private transient RemoteSubscriptionManager remoteSubscriptionManager;
private transient ThrottleManager throttleManager;
private transient MessageDestinationControl controller;
//--------------------------------------------------------------------------
//
// Constructor
//
//--------------------------------------------------------------------------
/**
* Constructs an unmanaged <code>MessageDestination</code> instance.
*/
public MessageDestination() {
this(false);
}
/**
* Constructs a <code>MessageDestination</code> with the indicated management.
*
* @param enableManagement <code>true</code> if the <code>MessageDestination</code>
* is manageable; otherwise <code>false</code>.
*/
public MessageDestination(boolean enableManagement) {
super(enableManagement);
serverSettings = new ServerSettings();
// Managers
subscriptionManager = new SubscriptionManager(this);
remoteSubscriptionManager = new RemoteSubscriptionManager(this);
}
//--------------------------------------------------------------------------
//
// Initialize, validate, start, and stop methods.
//
//--------------------------------------------------------------------------
/**
* Initializes the <code>MessageDestination</code> with the properties.
* If subclasses override, they must call <code>super.initialize()</code>.
*
* @param id The id of the destination.
* @param properties Properties for the <code>MessageDestination</code>.
*/
@Override
public void initialize(String id, ConfigMap properties) {
super.initialize(id, properties);
if (properties == null || properties.size() == 0)
return;
// Network properties
network(properties);
// Server properties
server(properties);
}
/**
* Sets up the throttle manager before it starts.
*/
@Override
public void start() {
// Create the throttle manager, only if needed.
if (networkSettings.getThrottleSettings() != null) {
ThrottleSettings settings = networkSettings.getThrottleSettings();
if (settings.isClientThrottleEnabled() || settings.isDestinationThrottleEnabled()) {
settings.setDestinationName(getId());
throttleManager = createThrottleManager();
throttleManager.setThrottleSettings(settings);
throttleManager.start();
}
}
super.start();
}
/**
* Stops the subscription, remote subscription, and throttle managers and
* then calls super class's stop.
*/
@Override
public void stop() {
if (isStarted()) {
subscriptionManager.stop();
remoteSubscriptionManager.stop();
if (throttleManager != null)
throttleManager.stop();
}
super.stop();
}
//--------------------------------------------------------------------------
//
// Public Getters and Setters for Destination properties
//
//--------------------------------------------------------------------------
/**
* Sets the <code>NetworkSettings</code> of the <code>MessageDestination</code>.
*
* @param networkSettings The <code>NetworkSettings</code> of the <code>MessageDestination</code>
*/
@Override
public void setNetworkSettings(NetworkSettings networkSettings) {
super.setNetworkSettings(networkSettings);
// Set the subscription manager settings if needed.
if (networkSettings.getSubscriptionTimeoutMinutes() > 0) {
long subscriptionTimeoutMillis = networkSettings.getSubscriptionTimeoutMinutes() * 60L * 1000L; // Convert to millis.
subscriptionManager.setSubscriptionTimeoutMillis(subscriptionTimeoutMillis);
}
}
/**
* Returns the <code>ServerSettings</code> of the <code>MessageDestination</code>.
*
* @return The <code>ServerSettings</code> of the <code>MessageDestination</code>.
*/
public ServerSettings getServerSettings() {
return serverSettings;
}
/**
* Sets the <code>ServerSettings</code> of the <code>MessageDestination</code>.
*
* @param serverSettings The <code>ServerSettings</code> of the <code>MessageDestination</code>
*/
public void setServerSettings(ServerSettings serverSettings) {
this.serverSettings = serverSettings;
}
/**
* Casts the <code>Service</code> into <code>MessageService</code>
* and calls super.setService.
*
* @param service The <code>Service</code> managing this <code>Destination</code>.
*/
@Override
public void setService(Service service) {
MessageService messageService = (MessageService) service;
super.setService(messageService);
}
//--------------------------------------------------------------------------
//
// Other Public Methods
//
//--------------------------------------------------------------------------
/**
* Returns a <tt>ConfigMap</tt> of destination properties that the client
* needs. This includes properties from <code>super{@link #describeDestination(boolean)}</code>
* and it also includes outbound throttling policy that the edge server might need.
*
* @param onlyReliable Determines whether only reliable destination configuration should be returned.
* @return A <tt>ConfigMap</tt> of destination properties that the client needs.
*/
@Override
public ConfigMap describeDestination(boolean onlyReliable) {
ConfigMap destinationConfig = super.describeDestination(onlyReliable);
if (destinationConfig == null)
return null;
if (throttleManager == null)
return destinationConfig;
Policy outboundPolicy = throttleManager.getOutboundPolicy();
if (outboundPolicy == null || outboundPolicy == Policy.NONE)
return destinationConfig;
// Add the outbound throttle policy to network properties section as appropriate.
ConfigMap properties = destinationConfig.getPropertyAsMap(ConfigurationConstants.PROPERTIES_ELEMENT, null);
if (properties == null) {
properties = new ConfigMap();
destinationConfig.addProperty(ConfigurationConstants.PROPERTIES_ELEMENT, properties);
}
ConfigMap network = properties.getPropertyAsMap(NetworkSettings.NETWORK_ELEMENT, null);
if (network == null) {
network = new ConfigMap();
properties.addProperty(NetworkSettings.NETWORK_ELEMENT, network);
}
ConfigMap throttleOutbound = new ConfigMap();
throttleOutbound.addProperty(ThrottleSettings.ELEMENT_POLICY, throttleManager.getOutboundPolicy().toString());
network.addProperty(ThrottleSettings.ELEMENT_OUTBOUND, throttleOutbound);
return destinationConfig;
}
public SubscriptionManager getSubscriptionManager() {
return subscriptionManager;
}
public RemoteSubscriptionManager getRemoteSubscriptionManager() {
return remoteSubscriptionManager;
}
public ThrottleManager getThrottleManager() {
return throttleManager;
}
@Override
public boolean equals(Object o) {
if (o instanceof Destination) {
Destination d = (Destination) o;
String serviceType1 = d.getServiceType();
String serviceType2 = getServiceType();
if ((serviceType1 == null && serviceType2 == null) || (serviceType1 != null && serviceType1.equals(serviceType2))) {
String id1 = d.getId();
String id2 = getId();
if ((id1 == null && id2 == null) || (id1 != null && id1.equals(id2)))
return true;
}
}
return false;
}
@Override
public int hashCode() {
return (getServiceType() == null ? 0 : getServiceType().hashCode()) * 100003 +
(getId() == null ? 0 : getId().hashCode());
}
@Override
public String toString() {
return getServiceType() + "#" + getId();
}
//--------------------------------------------------------------------------
//
// Protected/Private Methods
//
//--------------------------------------------------------------------------
protected ThrottleManager createThrottleManager() {
Service service = getService();
if (service == null || service.getMessageBroker() == null)
return new ThrottleManager(); // Return the default.
try {
Class<? extends ThrottleManager> throttleManagerClass = service.getMessageBroker().getThrottleManagerClass();
Object instance = ClassUtil.createDefaultInstance(throttleManagerClass, null);
if (instance instanceof ThrottleManager)
return (ThrottleManager) instance;
} catch (Throwable t) {
// NOWARN
}
return new ThrottleManager(); // Return the default.
}
protected void network(ConfigMap properties) {
ConfigMap network = properties.getPropertyAsMap(NetworkSettings.NETWORK_ELEMENT, null);
if (network == null)
return;
// Get implementation specific network settings, including subclasses!
NetworkSettings ns = getNetworkSettings();
// Subscriber timeout; first check for subscription-timeout-minutes and fallback to legacy session-timeout.
int useLegacyPropertyToken = -999999;
int subscriptionTimeoutMinutes = network.getPropertyAsInt(NetworkSettings.SUBSCRIPTION_TIMEOUT_MINUTES, useLegacyPropertyToken);
if (subscriptionTimeoutMinutes == useLegacyPropertyToken)
subscriptionTimeoutMinutes = network.getPropertyAsInt(NetworkSettings.SESSION_TIMEOUT, NetworkSettings.DEFAULT_TIMEOUT);
ns.setSubscriptionTimeoutMinutes(subscriptionTimeoutMinutes);
// Throttle Settings
ThrottleSettings ts = ns.getThrottleSettings();
ts.setDestinationName(getId());
throttle(ts, network);
setNetworkSettings(ns);
}
protected void throttle(ThrottleSettings ts, ConfigMap network) {
ConfigMap inbound = network.getPropertyAsMap(ThrottleSettings.ELEMENT_INBOUND, null);
if (inbound != null) {
ThrottleSettings.Policy policy = getPolicyFromThrottleSettings(inbound);
ts.setInboundPolicy(policy);
int destFreq = inbound.getPropertyAsInt(ThrottleSettings.ELEMENT_DEST_FREQ, 0);
ts.setIncomingDestinationFrequency(destFreq);
int clientFreq = inbound.getPropertyAsInt(ThrottleSettings.ELEMENT_CLIENT_FREQ, 0);
ts.setIncomingClientFrequency(clientFreq);
}
ConfigMap outbound = network.getPropertyAsMap(ThrottleSettings.ELEMENT_OUTBOUND, null);
if (outbound != null) {
ThrottleSettings.Policy policy = getPolicyFromThrottleSettings(outbound);
ts.setOutboundPolicy(policy);
int destFreq = outbound.getPropertyAsInt(ThrottleSettings.ELEMENT_DEST_FREQ, 0);
ts.setOutgoingDestinationFrequency(destFreq);
int clientFreq = outbound.getPropertyAsInt(ThrottleSettings.ELEMENT_CLIENT_FREQ, 0);
ts.setOutgoingClientFrequency(clientFreq);
}
}
private ThrottleSettings.Policy getPolicyFromThrottleSettings(ConfigMap settings) {
String policyString = settings.getPropertyAsString(ThrottleSettings.ELEMENT_POLICY, null);
ThrottleSettings.Policy policy = ThrottleSettings.Policy.NONE;
if (policyString == null)
return policy;
try {
policy = ThrottleSettings.parsePolicy(policyString);
} catch (ConfigurationException exception) {
ConfigurationException ce = new ConfigurationException();
ce.setMessage(UNSUPPORTED_POLICY, new Object[]{getId(), policyString});
throw ce;
}
return policy;
}
protected void server(ConfigMap properties) {
ConfigMap server = properties.getPropertyAsMap(DestinationSettings.SERVER_ELEMENT, null);
if (server == null)
return;
long ttl = server.getPropertyAsLong(MessagingConstants.TIME_TO_LIVE_ELEMENT, -1);
serverSettings.setMessageTTL(ttl);
boolean durable = server.getPropertyAsBoolean(MessagingConstants.IS_DURABLE_ELEMENT, false);
serverSettings.setDurable(durable);
boolean allowSubtopics = server.getPropertyAsBoolean(MessagingConstants.ALLOW_SUBTOPICS_ELEMENT, false);
serverSettings.setAllowSubtopics(allowSubtopics);
boolean disallowWildcardSubtopics = server.getPropertyAsBoolean(MessagingConstants.DISALLOW_WILDCARD_SUBTOPICS_ELEMENT, false);
serverSettings.setDisallowWildcardSubtopics(disallowWildcardSubtopics);
int priority = server.getPropertyAsInt(MessagingConstants.MESSAGE_PRIORITY, -1);
if (priority != -1)
serverSettings.setPriority(priority);
String subtopicSeparator = server.getPropertyAsString(MessagingConstants.SUBTOPIC_SEPARATOR_ELEMENT, MessagingConstants.DEFAULT_SUBTOPIC_SEPARATOR);
serverSettings.setSubtopicSeparator(subtopicSeparator);
String routingMode = server.getPropertyAsString(MessagingConstants.CLUSTER_MESSAGE_ROUTING, "server-to-server");
serverSettings.setBroadcastRoutingMode(routingMode);
}
/**
* Returns the log category of the <code>MessageDestination</code>.
*
* @return The log category of the component.
*/
@Override
protected String getLogCategory() {
return LOG_CATEGORY;
}
/**
* Invoked automatically to allow the <code>MessageDestination</code> to setup its corresponding
* MBean control.
*
* @param service The <code>Service</code> that manages this <code>MessageDestination</code>.
*/
@Override
protected void setupDestinationControl(Service service) {
controller = new MessageDestinationControl(this, service.getControl());
controller.register();
setControl(controller);
setupThrottleManagerControl(controller);
setupSubscriptionManagerControl(controller);
}
protected void setupThrottleManagerControl(MessageDestinationControl destinationControl) {
if (throttleManager != null) {
ThrottleManagerControl throttleManagerControl = new ThrottleManagerControl(throttleManager, destinationControl);
throttleManagerControl.register();
throttleManager.setControl(throttleManagerControl);
throttleManager.setManaged(true);
destinationControl.setThrottleManager(throttleManagerControl.getObjectName());
}
}
private void setupSubscriptionManagerControl(MessageDestinationControl destinationControl) {
SubscriptionManagerControl subscriptionManagerControl = new SubscriptionManagerControl(getSubscriptionManager(), destinationControl);
subscriptionManagerControl.register();
getSubscriptionManager().setControl(subscriptionManagerControl);
getSubscriptionManager().setManaged(true);
destinationControl.setSubscriptionManager(subscriptionManagerControl.getObjectName());
}
}