blob: 9b1cdef0a1b9089ae84134acb1c1849ef4d9a21c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging.cluster;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Collections;
import java.util.TreeSet;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import flex.messaging.Destination;
import flex.messaging.MessageBroker;
import flex.messaging.config.ClusterSettings;
import flex.messaging.config.ConfigMap;
import flex.messaging.endpoints.Endpoint;
import flex.messaging.util.ClassUtil;
/**
* The manager of all clusters defined in services-config.xml, and the broker
* for the clusters created for clustered destinations.
*/
public class ClusterManager {
/**
* Supported operations.
*/
public static final String OPERATION_ADD_ENDPOINT_FOR_CHANNEL = "addEndpointForChannel";
public static final String OPERATION_SEND_ENDPOINT_URL = "sendEndpointUrl";
public static final String OPERATION_RECEIVE_ENDPOINT_URL = "receiveEndpointUrl";
public static final String OPERATION_PUSH_MESSAGE_FROM_PEER = "pushMessageFromPeer";
public static final String OPERATION_PEER_SYNC_AND_PUSH = "peerSyncAndPush";
public static final String OPERATION_REQUEST_ADAPTER_STATE = "requestAdapterState";
public static final String OPERATION_RECEIVE_ADAPTER_STATE = "receiveAdapterState";
public static final String OPERATION_SEND_SUBSCRIPTIONS = "sendSubscriptions";
public static final String OPERATION_RECEIVE_SUBSCRIPTIONS = "receiveSubscriptions";
public static final String OPERATION_SUBSCRIBE_FROM_PEER = "subscribeFromPeer";
public static final String OPERATION_PUSH_MESSAGE_FROM_PEER_TO_PEER = "pushMessageFromPeerToPeer";
public static final String OPERATION_PEER_SYNC_AND_PUSH_ONE_TO_PEER = "peerSyncAndPushOneToPeer";
/**
* A link to the MessageBroker.
*/
private MessageBroker broker;
/**
* A mapping between the cluster ids and the Cluster instances.
* name=clusterId value=clusterInstance
*/
private LinkedHashMap<String, Cluster> clusters;
/**
* A mapping between destinations and the Cluster instances.
*/
private Map<String, Cluster> clustersForDestination;
/**
* A mapping between cluster ids and their configuration files.
* name=clusterId value=propsFile
*/
private Map<String, Element> clusterConfig;
/**
* A mapping between cluster ids and ClusterSettings instances.
* name=clusterId value=ClusterSettings
*/
private Map<String, ClusterSettings> clusterSettings;
/**
* A mapped between destinations and a boolean representing
* whether or not the backend for the destination is shared.
*/
private Map<String, Boolean> backendSharedForDestination;
/**
* The default cluster when the cluster id for the destination
* is unspecified.
*/
private Cluster defaultCluster;
/**
* The id of the default cluster.
*/
private String defaultClusterId;
/**
* The manager of all clusters defined in services-config.xml, and the broker
* for the clusters created for clustered destinations. This class provides
* an entry point and abstraction to the logical cluster implementation as
* well as the specific cluster implementation.
*
* @param broker the message broker which uses the cluster manager
*/
public ClusterManager(MessageBroker broker) {
this.broker = broker;
clusters = new LinkedHashMap<String, Cluster>();
clusterConfig = new HashMap<String, Element>();
clusterSettings = new HashMap<String, ClusterSettings>();
clustersForDestination = new HashMap<String, Cluster>();
backendSharedForDestination = new HashMap<String, Boolean>();
}
/**
* The MessageBroker for this cluster.
*
* @return The defined MessageBroker.
*/
public MessageBroker getMessageBroker() {
return broker;
}
/**
* The default cluster when the cluster id for the destination
* is unspecified.
*
* @return Cluster the default Cluster to use
*/
public Cluster getDefaultCluster() {
return defaultCluster;
}
/**
* The id of the default cluster.
*
* @return String the default cluster ID
*/
public String getDefaultClusterId() {
return defaultClusterId;
}
/**
* Invoke an endpoint operation across the cluster.
* <p>
* NOTE: Endpoints don't reference a specific cluster so the default cluster is used for the broadcast.
* If no default cluster is defined the operation is broadcast over all defined clusters.
* </p>
*
* @param endpointId The id of the remote endpoint across the cluster to invoke an operation on.
* @param operationName The name of the operation to invoke.
* @param params The arguments to use for operation invocation.
*/
public void invokeEndpointOperation(String endpointId, String operationName, Object[] params) {
Object[] arguments = new Object[2 + params.length];
arguments[0] = endpointId;
arguments[1] = operationName;
int n = params.length;
for (int i = 2, j = 0; j < n; ++i, ++j)
arguments[i] = params[j];
if (defaultCluster != null) {
defaultCluster.broadcastServiceOperation(operationName, arguments);
} else {
for (Cluster cluster : clusters.values())
cluster.broadcastServiceOperation(operationName, arguments);
}
}
/**
* Invoke an endpoint operation on a specific peer within the cluster.
* <p>
* NOTE: Endpoints don't reference a specific cluster so the default cluster is used for the broadcast.
* If no default cluster is defined the operation is broadcast over all defined clusters.
* </p>
*
* @param endpointId The id of the remote endpoint across the cluster to invoke an operation on.
* @param operationName The name of the operation to invoke.
* @param params The arguments to use for operation invocation.
* @param targetAddress The peer node that the operation should be invoked on.
*/
public void invokePeerToPeerEndpointOperation(String endpointId, String operationName, Object[] params, Object targetAddress) {
Object[] arguments = new Object[2 + params.length];
arguments[0] = endpointId;
arguments[1] = operationName;
int n = params.length;
for (int i = 2, j = 0; j < n; ++i, ++j)
arguments[i] = params[j];
if (defaultCluster != null) {
defaultCluster.sendPointToPointServiceOperation(operationName, arguments, targetAddress);
} else {
for (Cluster cluster : clusters.values()) {
cluster.sendPointToPointServiceOperation(operationName, arguments, targetAddress);
}
}
}
/**
* Invoke a service-related operation, which usually includes a Message as a method parameter. This method
* allows a local service to process a Message and then send the Message to the services on all peer nodes
* so that they may perform the same processing. Invoke the service operation for the cluster, identified by
* serviceType and destinationName.
*
* @param serviceType The name for the service for this destination.
* @param destinationName The name of the destination.
* @param operationName The name of the service operation to invoke.
* @param params Parameters needed for the service operation.
*/
public void invokeServiceOperation(String serviceType, String destinationName,
String operationName, Object[] params) {
Cluster c = getCluster(serviceType, destinationName);
ArrayList newParams = new ArrayList(Arrays.asList(params));
newParams.add(0, serviceType);
newParams.add(1, destinationName);
c.broadcastServiceOperation(operationName, newParams.toArray());
}
/**
* Send a service-related operation in point-to-point fashion to one and only one member of the cluster.
* This is similar to the invokeServiceOperation except that this invocation is sent to the node,
* identified by targetAddress.
*
* @param serviceType The name for the service for this destination.
* @param destinationName The name of the destination.
* @param operationName The name of the service operation to invoke.
* @param params Parameters needed for the service operation.
* @param targetAddress The node that the operation should be passed to.
*/
public void invokePeerToPeerOperation(String serviceType, String destinationName,
String operationName, Object[] params, Object targetAddress) {
Cluster c = getCluster(serviceType, destinationName);
ArrayList newParams = new ArrayList(Arrays.asList(params));
newParams.add(0, serviceType);
newParams.add(1, destinationName);
c.sendPointToPointServiceOperation(operationName, newParams.toArray(), targetAddress);
}
/**
* Determines whether the given destination is clustered.
*
* @param serviceType The name for the service for this destination.
* @param destinationName The name of the destination.
* @return Whether the destination is a clustered destination.
*/
public boolean isDestinationClustered(String serviceType, String destinationName) {
return getCluster(serviceType, destinationName) != null;
}
/**
* Checks whether the give destination is configured for a shared backend.
*
* @param serviceType The name of the service for this destination.
* @param destinationName The name of the destination.
* @return Whether the destination is configured for shared backend.
*/
public boolean isBackendShared(String serviceType, String destinationName) {
String destKey = Cluster.getClusterDestinationKey(serviceType, destinationName);
Boolean shared = backendSharedForDestination.get(destKey);
return shared != null ? shared.booleanValue() : false;
}
/**
* Retrieves a list of cluster nodes for the given cluster.
*
* @param serviceType The name of the service for the clustered destination.
* @param destinationName The name of the destination.
* @return List of cluster nodes for the given cluster.
*/
public List getClusterMemberAddresses(String serviceType, String destinationName) {
Cluster c = getCluster(serviceType, destinationName);
return c != null ? c.getMemberAddresses() : Collections.EMPTY_LIST;
}
/**
* Used for targeted endpoint operation invocations across the cluster.
* If a default cluster is defined, its list of member addresses is returned.
* Otherwise, a de-duped list of all member addresses from all registered clusters is returned.
*
* @return The list of cluster nodes that endpoint operation invocations can be issued against.
*/
public List getClusterMemberAddresses() {
if (defaultCluster != null)
return defaultCluster.getMemberAddresses();
TreeSet uniqueAddresses = new TreeSet();
for (Cluster cluster : clusters.values())
uniqueAddresses.addAll(cluster.getMemberAddresses());
return new ArrayList(uniqueAddresses);
}
/**
* Find the properties file in the given cluster settings. Read the XML based
* cluster configuration file and save the settings and configuration for the
* given cluster for retrieval later.
*
* @param settings The cluster settings for a specific cluster.
*/
public void prepareCluster(ClusterSettings settings) {
String propsFileName = settings.getPropsFileName();
checkForNullPropertiesFile(settings.getClusterName(), propsFileName);
InputStream propsFile = resolveInternalPath(propsFileName);
if (propsFile == null)
propsFile = resolveExternalPath(propsFileName);
if (propsFile == null)
throwClusterException(10208, new Object[]{propsFileName}, null);
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(false);
factory.setValidating(false);
DocumentBuilder builder = factory.newDocumentBuilder();
Document doc = builder.parse(propsFile);
if (settings.isDefault())
defaultClusterId = settings.getClusterName();
clusterConfig.put(settings.getClusterName(), doc.getDocumentElement());
clusterSettings.put(settings.getClusterName(), settings);
} catch (Exception ex) {
throwClusterException(10213, new Object[]{propsFileName}, ex);
}
}
/**
* Retrieve the local address for the specified clustered destination.
*
* @param serviceType The service type of the clustered destination.
* @param destinationName The name of the clustered destination.
* @return The local address of the clustered destination.
*/
public Object getLocalAddress(String serviceType, String destinationName) {
Cluster c = getCluster(serviceType, destinationName);
return c != null ? c.getLocalAddress() : null;
}
/**
* Retrieve the local address for the default cluster or if no default cluster is defined
* return the local address derived from the first cluster of any defined.
*
* @return The local address for this cluster node, or <code>null</code> if this node
* is not a member of any cluster.
*/
public Object getLocalAddress() {
if (defaultCluster != null)
return defaultCluster.getLocalAddress();
// Else, use first defined cluster.
for (Entry<String, Cluster> entry : clusters.entrySet())
return entry.getValue().getLocalAddress();
return null; // No cluster defined.
}
/**
* Find the cluster for the specified cluster id.
*
* @param clusterId the cluster ID
* @return The cluster identified by the given id.
*/
public Cluster getClusterById(String clusterId) {
return clusters.get(clusterId);
}
/**
* Find the cluster identified by the service type and destination name.
*
* @param serviceType The service type of the clustered destination.
* @param destinationName The name of the clustered destination.
* @return The cluster identified by the serviec type and destination naem.
*/
public Cluster getCluster(String serviceType, String destinationName) {
Cluster cluster = null;
try {
String destKey = Cluster.getClusterDestinationKey(serviceType, destinationName);
cluster = clustersForDestination.get(destKey);
if (cluster == null)
cluster = defaultCluster;
} catch (NoClassDefFoundError nex) {
ClusterException cx = new ClusterException();
cx.setMessage(10202, new Object[]{destinationName});
cx.setRootCause(nex);
throw cx;
}
return cluster;
}
/**
* Call destroy on each of the managed clusters.
*/
public void destroyClusters() {
for (Iterator<Cluster> iter = clusters.values().iterator(); iter.hasNext(); ) {
Cluster cluster = iter.next();
cluster.destroy();
iter.remove();
}
}
/**
* Add the specified destination to the cluster, identitied by clusterId if available. If the cluster
* is not currently defined, create the cluster. Also, setup the load balancing urls and shared
* backend information for this clustered destination and endpoint.
*
* @param clusterId The cluster id that this destination wants to be associated with.
* @param serviceType The service type for the clustered destination.
* @param destinationName The name of the clustered destination.
* @param channelId The channel id that should be added to the cluster load balancing.
* @param endpointUrl The endpoint url that should be added to the cluster load balancing.
* @param endpointPort The endpoint port that should be added to the cluster load balancing.
* @param sharedBackend Whether the destination has shared backend set to true or not.
*/
public void clusterDestinationChannel(String clusterId, String serviceType, String destinationName,
String channelId, String endpointUrl, int endpointPort, boolean sharedBackend) {
Cluster cluster = getClusterById(clusterId);
String destKey = Cluster.getClusterDestinationKey(serviceType, destinationName);
if (cluster == null) {
if (!clusterConfig.containsKey(clusterId)) {
ClusterException cx = new ClusterException();
cx.setMessage(10207, new Object[]{destinationName, clusterId});
throw cx;
}
cluster = createCluster(clusterId, serviceType, destinationName);
} else {
clustersForDestination.put(destKey, cluster);
}
backendSharedForDestination.put(destKey, sharedBackend ? Boolean.TRUE : Boolean.FALSE);
if (cluster.getURLLoadBalancing())
cluster.addLocalEndpointForChannel(serviceType, destinationName,
channelId, endpointUrl, endpointPort);
}
/**
* Adds the destination to the cluster. The settings for the clustered destination are
* available from the <code>Destination</code> object.
*
* @param destination The destination to be clustered.
*/
public void clusterDestination(Destination destination) {
String clusterId = destination.getNetworkSettings().getClusterId();
if (clusterId == null)
clusterId = getDefaultClusterId();
ClusterSettings cls = clusterSettings.get(clusterId);
if (cls == null) {
ClusterException ce = new ClusterException();
ce.setMessage(10217, new Object[]{destination.getId(), clusterId});
throw ce;
}
for (String channelId : destination.getChannels()) {
Endpoint endpoint = broker.getEndpoint(channelId);
String endpointUrl = endpoint.getUrl();
int endpointPort = endpoint.getPort();
// This is only an error if we are using client side url-based load balancing. If
// there is a HW load balancer, then we can assume the server.name served up by the
// SWF can be used to access the cluster members. With client side load balancing,
// the clients need the direct URLs of all of the servers.
if (cls.getURLLoadBalancing()) {
// Ensure that the endpoint URI does not contain any replacement tokens.
int tokenStart = endpointUrl.indexOf('{');
if (tokenStart != -1) {
int tokenEnd = endpointUrl.indexOf('}', tokenStart);
if (tokenEnd == -1)
tokenEnd = endpointUrl.length();
else
tokenEnd++;
ClusterException ce = new ClusterException();
ce.setMessage(10209, new Object[]{destination.getId(), channelId, endpointUrl.substring(tokenStart, tokenEnd)});
throw ce;
}
}
clusterDestinationChannel(clusterId, destination.getServiceType(),
destination.getId(), channelId, endpointUrl, endpointPort, destination.getNetworkSettings().isSharedBackend());
}
}
/**
* Get a list of endpoints for the destination.
*
* @param serviceType the service type
* @param destinationName destination name
* @return List the list endpoints that the destination can use
*/
public List getEndpointsForDestination(String serviceType, String destinationName) {
Cluster c = getCluster(serviceType, destinationName);
return c != null ? c.getAllEndpoints(serviceType, destinationName) : null;
}
private void checkForNullPropertiesFile(String clusterName, String propsFileName) {
if (propsFileName == null)
throwClusterException(10201, new Object[]{clusterName, propsFileName}, null);
}
/**
* Create the cluster based on the cluster settings already available. The cluster
* is added to the cluster managers list of clusters indexed by the cluster id.
* The cluster is also associated with the specified service type and destination
* name. The cluster id is unique across all clusters managed by this cluster
* manager. The cluster may be associated with more than one cluster destination.
*
* @param clusterId The cluster id.
* @param serviceType The service type of the clustered destination.
* @param destinationName The destination name for the clustered destination.
* @return The new cluster.
*/
private Cluster createCluster(String clusterId, String serviceType, String destinationName) {
String destKey = Cluster.getClusterDestinationKey(serviceType, destinationName);
Element propsFile = clusterConfig.get(clusterId);
ClusterSettings cls = clusterSettings.get(clusterId);
Cluster cluster = null;
Class clusterClass = ClassUtil.createClass(cls.getImplementationClass());
Constructor clusterConstructor = null;
try {
clusterConstructor = clusterClass.getConstructor(ClusterManager.class);
} catch (Exception e) {
ClusterException cx = new ClusterException();
cx.setMessage(10210);
cx.setRootCause(e);
throw cx;
}
try {
cluster = (Cluster) clusterConstructor.newInstance(this);
cluster.setClusterPropertiesFile(propsFile);
cluster.setURLLoadBalancing(cls.getURLLoadBalancing());
cluster.initialize(clusterId, cls.getProperties());
} catch (Exception e) {
ClusterException cx = new ClusterException();
cx.setMessage(10211);
cx.setRootCause(e);
throw cx;
}
clustersForDestination.put(destKey, cluster);
clusters.put(clusterId, cluster);
if (defaultClusterId != null && defaultClusterId.equals(clusterId))
defaultCluster = cluster;
return cluster;
}
private InputStream resolveExternalPath(String propsFileName) {
try {
return broker.resolveExternalPath(propsFileName);
} catch (Throwable t) {
throwClusterException(10208, new Object[]{propsFileName}, t);
}
return null;
}
private InputStream resolveInternalPath(String propsFileName) {
try {
return broker.resolveInternalPath(propsFileName);
} catch (Throwable t) {
throwClusterException(10208, new Object[]{propsFileName}, t);
}
return null;
}
private void throwClusterException(int number, Object[] args, Throwable t) {
ClusterException cx = new ClusterException();
cx.setMessage(number, args);
if (t != null)
cx.setRootCause(t);
throw cx;
}
/**
* Return a {@link ConfigMap} describing the clusters that have been added to the cluster manager
*
* @return a ConfigMap of the clusters
*/
public ConfigMap describeClusters() {
ConfigMap result = new ConfigMap();
for (Entry<String, Cluster> entry : clusters.entrySet()) {
Cluster cluster = entry.getValue();
ConfigMap clusterMap = new ConfigMap();
clusterMap.put("id", entry.getKey());
ClusterSettings settings = clusterSettings.get(entry.getKey());
clusterMap.put("properties", settings.getPropsFileName());
if (settings.isDefault()) {
clusterMap.put("default", "true");
}
clusterMap.put("class", cluster.getClass().getCanonicalName());
result.addProperty("cluster", clusterMap);
}
return result;
}
}