blob: 7d7d1eec832e01a6bf418b7927b8a74eada90204 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging.services;
import flex.management.ManageableComponent;
import flex.management.runtime.messaging.MessageBrokerControl;
import flex.messaging.Destination;
import flex.messaging.MessageBroker;
import flex.messaging.MessageException;
import flex.messaging.cluster.ClusterManager;
import flex.messaging.config.ConfigMap;
import flex.messaging.config.ConfigurationConstants;
import flex.messaging.config.ConfigurationException;
import flex.messaging.endpoints.Endpoint;
import flex.messaging.log.Log;
import flex.messaging.log.LogCategories;
import flex.messaging.messages.CommandMessage;
import flex.messaging.messages.Message;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* This is the default implementation of <code>Service</code>, which provides a
* convenient base for behavior and associations common to all Services.
*/
public abstract class AbstractService extends ManageableComponent implements Service
{
/** Log category for <code>AbstractService</code>.*/
public static final String LOG_CATEGORY = LogCategories.SERVICE_GENERAL;
/**
* Log category that captures startup information for service's destinations.
*/
public static final String LOG_CATEGORY_STARTUP_DESTINATION = LogCategories.STARTUP_DESTINATION;
// Errors
protected static final int UNKNOWN_MESSAGE_TYPE = 10454;
// AbstractService's properties
protected Map<String, String> adapterClasses;
protected String defaultAdapterId;
protected List<String> defaultChannels;
protected Map<String, Destination> destinations;
//--------------------------------------------------------------------------
//
// Constructor
//
//--------------------------------------------------------------------------
/**
* Constructs an unmanaged <code>AbstractService</code>.
*/
public AbstractService()
{
this(false);
}
/**
* Constructs an <code>AbstractService</code> with the indicated management.
*
* @param enableManagement <code>true</code> if the <code>AbstractService</code>
* is manageable; otherwise <code>false</code>.
*/
public AbstractService(boolean enableManagement)
{
super(enableManagement);
adapterClasses = new HashMap<String, String>();
destinations = new ConcurrentHashMap<String, Destination>();
}
//--------------------------------------------------------------------------
//
// Initialize, validate, start, and stop methods.
//
//--------------------------------------------------------------------------
/**
* Verifies that the <code>AbstractService</code> is in valid state before
* it is started. If subclasses override, they must call <code>super.validate()</code>.
*/
@Override
protected void validate()
{
if (isValid())
return;
super.validate();
if (defaultChannels != null)
{
for (Iterator<String> iter = defaultChannels.iterator(); iter.hasNext();)
{
String id = iter.next();
if (!getMessageBroker().getChannelIds().contains(id))
{
iter.remove();
if (Log.isWarn())
{
Log.getLogger(getLogCategory()).warn("Removing the Channel "+id+" from Destination "+getId()+
"as MessageBroker does not know the channel");
}
}
}
}
else
{
defaultChannels = getMessageBroker().getDefaultChannels();
}
}
/**
* Starts the service if its associated <code>MessageBroker</code> is started.
* and if the service is not already running. The default implementation of
* this method starts all of the destinations of the service.
* If subclasses override, they must call <code>super.start()</code>.
*/
@Override
public void start()
{
if (isStarted())
{
// Needed for destinations added after startup.
startDestinations();
return;
}
// Check if the MessageBroker is started
MessageBroker broker = getMessageBroker();
if (!broker.isStarted())
{
if (Log.isWarn())
{
Log.getLogger(getLogCategory()).warn("Service with id '{0}' cannot be started" +
" when the MessageBroker is not started.",
new Object[]{getId()});
}
return;
}
// Set up management
if (isManaged() && broker.isManaged())
{
setupServiceControl(broker);
MessageBrokerControl controller = (MessageBrokerControl)broker.getControl();
if (getControl() != null)
controller.addService(getControl().getObjectName());
}
super.start();
startDestinations();
}
/**
* The default implementation of this method stops all of the destinations
* of the service.
* If subclasses override, they must call <code>super.stop()</code>.
*/
@Override
public void stop()
{
if (!isStarted())
{
return;
}
stopDestinations();
super.stop();
// Remove management
if (isManaged() && getMessageBroker().isManaged())
{
if (getControl() != null)
{
getControl().unregister();
setControl(null);
}
setManaged(false);
}
}
//--------------------------------------------------------------------------
//
// Public Getters and Setters for AbstractService properties
//
//--------------------------------------------------------------------------
/**
* Returns the adapters registered with the <code>AbstractService</code>.
*
* @return The Map of adapter id and classes.
*/
public Map<String, String> getRegisteredAdapters()
{
return adapterClasses;
}
/**
* Registers the adapter with the <code>AbstractService</code>.
*
* @param id The id of the adapter.
* @param adapterClass The class of the adapter.
* @return The previous adapter class that the id was associated with.
*/
public String registerAdapter(String id, String adapterClass)
{
return adapterClasses.put(id, adapterClass);
}
/**
* Unregistered the adapter with the <code>AbstractService</code> and
* set the default adapter to <code>null</code> if needed.
*
* @param id The id of the adapter.
* @return The adapter class that the id was associated with.
*/
public String unregisterAdapter(String id)
{
if (id != null && id.equals(defaultAdapterId))
defaultAdapterId = null;
return adapterClasses.remove(id);
}
/**
* Returns the id of the default adapter of the <code>AbstractService</code>.
*
* @return defaultAdapterId The id of the default adapter of the <code>AbstractService</code>.
*/
public String getDefaultAdapter()
{
return defaultAdapterId;
}
/**
* Sets the default adapter of the <code>AbstractService</code>.
*
* @param id The id of the default adapter.
*/
public void setDefaultAdapter(String id)
{
if (adapterClasses.get(id) == null)
{
// No adapter with id '{0}' is registered with the service '{1}'.
ConfigurationException ex = new ConfigurationException();
ex.setMessage(ConfigurationConstants.UNREGISTERED_ADAPTER, new Object[]{id, getId()});
throw ex;
}
defaultAdapterId = id;
}
/**
* Returns the list of channel ids of the <code>AbstractService</code>.
*
* @return list of default channels
*/
public List<String> getDefaultChannels()
{
return defaultChannels;
}
/**
* Adds the channel to the list of channels of the <code>AbstractService</code>.
* <code>MessageBroker</code> has to know the channel. Otherwise, the channel
* is not added to the list.
*
* @param id The id of the channel.
*/
public void addDefaultChannel(String id)
{
if (defaultChannels == null)
defaultChannels = new ArrayList<String>();
else if (defaultChannels.contains(id))
return;
if (isStarted())
{
List<String> channelIds = getMessageBroker().getChannelIds();
if (channelIds == null || !channelIds.contains(id))
{
// No channel with id ''{0}'' is known by the MessageBroker.
if (Log.isWarn())
{
Log.getLogger(getLogCategory()).warn("No channel with id '{0}' is known by the MessageBroker." +
" Not adding the channel.",
new Object[]{id});
}
return;
}
}
// Either message broker knows about the channel, or service is not
// running and channel will be checked during startup
defaultChannels.add(id);
}
/**
* Sets the channel list of the <code>AbstractService</code>.
* <code>MessageBroker</code> has to know the channels, otherwise they
* are not added to the list.
*
* @param ids List of channel ids.
*/
public void setDefaultChannels(List<String> ids)
{
if (ids != null && isStarted())
{
List<String> channelIds = getMessageBroker().getChannelIds();
for (Iterator<String> iter = ids.iterator(); iter.hasNext();)
{
String id = iter.next();
if (channelIds == null || !channelIds.contains(id))
{
iter.remove();
if (Log.isWarn())
{
Log.getLogger(getLogCategory()).warn("No channel with id '{0}' is known by the MessageBroker." +
" Not adding the channel.",
new Object[]{id});
}
}
}
}
// Otherwise, channels will be checked before startup
defaultChannels = ids;
}
/**
* Removes the channel from the list of channels for the <code>AbstractService</code>.
*
* @param id The id of the channel.
* @return <code>true</code> if the list contained the channel id.
*/
public boolean removeDefaultChannel(String id)
{
return defaultChannels != null && defaultChannels.remove(id);
}
/**
* Returns the <code>Destination</code> that the <code>Message</code> targets.
*
* @param message the message to examine
* @return The <code>Destination</code> that the <code>Message</code> targets.
* @throws MessageException if no such <code>Destination</code> exists.
*/
public Destination getDestination(Message message)
{
String id = message.getDestination();
Destination result = getDestination(id);
if (result == null)
{
throw new MessageException
("No destination '" + id + "' exists in service " + getClass().getName());
}
return result;
}
/**
* Returns the <code>Destination</code> with the specified id or null if no
* <code>Destination</code> with id exists.
*
* @param id The id of the <code>Destination</code>.
* @return the destination
*/
public Destination getDestination(String id)
{
return destinations.get(id);
}
/**
* Returns a read-only Map of <code>Destination</code> ids and instances.
*
* @return The a read-only Map of <code>Destination</code> ids and instances.
*/
public Map<String, Destination> getDestinations()
{
return Collections.unmodifiableMap(destinations);
}
/**
* Creates a <code>Destination</code> instance, sets its id, sets it manageable
* if the <code>AbstractService</code> that created it is manageable,
* and sets its <code>Service</code> to the <code>AbstractService</code> that
* created it. Note that it cannot have a null id and cannot have an id of
* a <code>Destination</code> already registered with the <code>AbstractService</code>.
*
* @param id The id of the <code>Destination</code>.
* @return The <code>Destination</code> instanced created.
*/
public Destination createDestination(String id)
{
if (id == null)
{
// Cannot add ''{0}'' with null id to the ''{1}''
ConfigurationException ex = new ConfigurationException();
ex.setMessage(ConfigurationConstants.NULL_COMPONENT_ID, new Object[]{"Destination", "Service"});
throw ex;
}
// check with the message broker to make sure that no destination with the id already exists
getMessageBroker().isDestinationRegistered(id, getId(), true);
Destination destination = new Destination();
destination.setId(id);
destination.setManaged(isManaged());
destination.setService(this);
return destination;
}
/**
* Adds the <code>Destination</code> instance to the list of destinations
* known by the <code>AbstractService</code>. It also sets destination's
* service to this <code>AbstractService</code> instance. Note that
* <code>Destination</code> cannot be null, it cannot have a null id, and it
* cannot have an id of a <code>Destination</code> already registered with
* the <code>AbstractService</code>.
*
* <code>Destination</code> needs to be started if the <code>AbstractService</code>
* is already running.
*
* @param destination The <code>Destination</code> instance to be added.
*/
public void addDestination(Destination destination)
{
if (destination == null)
{
// Cannot add null ''{0}'' to the ''{1}''
ConfigurationException ex = new ConfigurationException();
ex.setMessage(ConfigurationConstants.NULL_COMPONENT, new Object[]{"Destination", "Service"});
throw ex;
}
String id = destination.getId();
if (id == null)
{
// Cannot add ''{0}'' with null id to the ''{1}''
ConfigurationException ex = new ConfigurationException();
ex.setMessage(ConfigurationConstants.NULL_COMPONENT_ID, new Object[]{"Destination", "Service"});
throw ex;
}
// No need to add if the destination is already there
if (getDestination(id) == destination)
{
return;
}
// Register with the MessageBroker first to make sure no destination
// with the same id exists in another service.
getMessageBroker().registerDestination(id, getId());
destinations.put(id, destination);
if (destination.getService() == null || destination.getService() != this)
{
destination.setService(this);
}
}
/**
* Removes the <code>Destination</code> from the list of destinations known
* by the <code>AbstractService</code>.
*
* @param id The id of the <code>Destination</code>.
* @return Previous <code>Destination</code> associated with the id.
*/
public Destination removeDestination(String id)
{
Destination destination = destinations.get(id);
if (destination != null)
{
destination.stop();
destinations.remove(id);
getMessageBroker().unregisterDestination(id);
}
return destination;
}
/**
* Sets the id of the <code>AbstractService</code>. If the <code>AbstractService</code>
* has a <code>MessageBroker</code> assigned, it also updates the id in the
* <code>MessageBroker</code>.
*
* @param id the id
*/
@Override
public void setId(String id)
{
String oldId = getId();
super.setId(id);
// Update the service id in the broker
MessageBroker broker = getMessageBroker();
if (broker != null)
{
// broker must have the service then
broker.removeService(oldId);
broker.addService(this);
}
}
/**
* Returns the <code>MessageBroker</code> of the <code>AbstractService</code>.
*
* @return MessageBroker of the <code>AbstractService</code>.
*/
public MessageBroker getMessageBroker()
{
return (MessageBroker)getParent();
}
/**
* Sets the <code>MessageBroker</code> of the <code>AbstractService</code>.
* Removes the <code>AbstractService</code> from the old broker
* (if there was one) and adds to the list of services in the new broker.
*
* @param broker <code>MessageBroker</code> of the <code>AbstractService</code>.
*/
public void setMessageBroker(MessageBroker broker)
{
MessageBroker oldBroker = getMessageBroker();
setParent(broker);
if (oldBroker != null)
{
oldBroker.removeService(getId());
}
// Add service to the new broker if needed
if (broker.getService(getId()) != this)
broker.addService(this);
}
//--------------------------------------------------------------------------
//
// Other Public APIs
//
//--------------------------------------------------------------------------
/**
* Calls {@link AbstractService#describeService(Endpoint, boolean)} with
* the passed in endpoint and boolean value of true.
*
* @param endpoint Endpoint used to filter the destinations of the service;
* no filtering is done if the endpoint is null.
* @return ConfigMap of service properties.
* @see flex.messaging.services.AbstractService#describeService(Endpoint, boolean)
*/
public ConfigMap describeService(Endpoint endpoint)
{
return describeService(endpoint, true);
}
/**
*
* Returns a <tt>ConfigMap</tt> of service properties that the client needs.
* The <tt>allDestinations</tt> flag controls whether configuration for all
* destinations or only reliable client destinations is returned.
* Subclasses can override to return properties relevant to their implementation.
*
* @param endpoint Endpoint used to filter the destinations of the service.
* No filtering is done if the endpoint is null.
* @param onlyReliable When false, configuration for all destinations is
* returned instead of only reliable destinations.
* @return ConfigMap of service properties.
*/
public ConfigMap describeService(Endpoint endpoint, boolean onlyReliable)
{
ConfigMap serviceConfig = null;
// Scan for reliable destinations; if any are found they need to be shipped to the client.
for (Destination destination : destinations.values())
{
boolean endpointIdMatches = false;
if (endpoint == null) // No need to check against destination channels.
{
endpointIdMatches = true;
}
else // One of the destination ids should match.
{
List<String> channels = destination.getChannels();
if (channels != null)
{
for (String channelId : channels)
{
if (channelId.equals(endpoint.getId()))
{
endpointIdMatches = true;
break;
}
}
}
}
if (endpointIdMatches)
{
ConfigMap destinationConfig = destination.describeDestination(onlyReliable);
if (destinationConfig != null && destinationConfig.size() > 0)
{
if (serviceConfig == null) // Lazy-init only if destinations exist.
{
serviceConfig = new ConfigMap();
serviceConfig.addProperty(ConfigurationConstants.ID_ATTR, getId());
}
serviceConfig.addProperty(ConfigurationConstants.DESTINATION_ELEMENT, destinationConfig);
}
}
}
return serviceConfig;
}
/**
* Processes messages by invoking the requested destination's adapter.
* Subclasses should provide their implementation.
*
* @param message the message to process
* @return the result
*/
public abstract Object serviceMessage(Message message);
/**
* {@inheritDoc}
*/
public Object serviceCommand(CommandMessage message)
{
Object result = serviceCommonCommands(message);
if (result != null)
{
// TODO: ServiceControl needs this method.
/*
if (isManaged())
{
((ServiceControl)getControl()).incrementServiceCommandCount();
}
*/
return result;
}
throw new MessageException("Service Does Not Support Command Type " + message.getOperation());
}
//--------------------------------------------------------------------------
//
// Protected/private methods.
//
//--------------------------------------------------------------------------
protected Object serviceCommonCommands(CommandMessage message)
{
Object commandResult = null;
if (message.getOperation() == CommandMessage.CLIENT_PING_OPERATION)
{
commandResult = Boolean.TRUE;
}
else if (message.getOperation() == CommandMessage.CLUSTER_REQUEST_OPERATION)
{
ClusterManager clusterManager = getMessageBroker().getClusterManager();
String serviceType = getClass().getName();
String destinationName = message.getDestination();
if (clusterManager.isDestinationClustered(serviceType, destinationName))
{
commandResult = clusterManager.getEndpointsForDestination(serviceType, destinationName);
}
else
{
// client should never send this message if its local
// config declares the destination is not clustered
commandResult = Boolean.FALSE;
}
}
return commandResult;
}
/**
* Returns the log category of the <code>AbstractService</code>. Subclasses
* can override to provide a more specific logging category.
*
* @return The log category.
*/
@Override
protected String getLogCategory()
{
return LOG_CATEGORY;
}
/**
* Invoked automatically to allow the <code>AbstractService</code> to setup its corresponding
* MBean control. Subclasses should override to setup and register their MBean control.
* Manageable subclasses should override this template method.
*
* @param broker The <code>MessageBroker</code> that manages this <code>AbstractService</code>.
*/
protected abstract void setupServiceControl(MessageBroker broker);
/**
* Start all of the destinations of the service.
*/
private void startDestinations()
{
for (Destination destination : destinations.values())
{
long timeBeforeStartup = 0;
if (Log.isDebug())
timeBeforeStartup = System.currentTimeMillis();
destination.start();
if (Log.isDebug())
{
long timeAfterStartup = System.currentTimeMillis();
Long diffMillis = timeAfterStartup - timeBeforeStartup;
Log.getLogger(LOG_CATEGORY_STARTUP_DESTINATION).debug("Destination with id '{0}' is ready (startup time: '{1}' ms)",
new Object[]{destination.getId(), diffMillis});
}
}
}
/**
* Stop all of the destinations of the service.
*/
private void stopDestinations()
{
for (Destination destination : destinations.values())
{
destination.stop();
}
}
}