blob: e22d2e2c6f06d78838e71eef567636780e12c392 [file] [log] [blame]
/*
* Copyright 1999,2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.cluster.tcp;
import java.beans.PropertyChangeSupport;
import java.io.IOException;
import java.net.URL;
import java.util.HashMap;
import java.util.Vector;
import org.apache.catalina.Container;
import org.apache.catalina.Lifecycle;
import org.apache.catalina.LifecycleEvent;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.LifecycleListener;
import org.apache.catalina.Manager;
import org.apache.catalina.Valve;
import org.apache.catalina.cluster.CatalinaCluster;
import org.apache.catalina.cluster.ClusterManager;
import org.apache.catalina.cluster.ClusterMessage;
import org.apache.catalina.cluster.Constants;
import org.apache.catalina.cluster.Member;
import org.apache.catalina.cluster.MembershipListener;
import org.apache.catalina.cluster.MembershipService;
import org.apache.catalina.cluster.MessageListener;
import org.apache.catalina.cluster.session.SessionMessage;
import org.apache.catalina.cluster.io.ListenCallback;
import org.apache.catalina.cluster.session.ReplicationStream;
import org.apache.catalina.util.LifecycleSupport;
import org.apache.catalina.util.StringManager;
import org.apache.commons.logging.Log;
import org.apache.tomcat.util.IntrospectionUtils;
/**
* A <b>Cluster</b> implementation using simple multicast.
* Responsible for setting
* up a cluster and provides callers with a valid multicast receiver/sender.
*
* @author Filip Hanik
* @author Remy Maucherat
* @version $Revision$, $Date$
*/
public class SimpleTcpCluster
implements CatalinaCluster, Lifecycle,
MembershipListener, ListenCallback,
LifecycleListener {
public static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog( SimpleTcpCluster.class );
// ----------------------------------------------------- Instance Variables
/**
* Descriptive information about this component implementation.
*/
protected static final String info = "SimpleTcpCluster/1.0";
/**
* the service that provides the membership
*/
protected MembershipService membershipService = null;
/**
* Whether to expire sessions when shutting down
*/
protected boolean expireSessionsOnShutdown = true;
/**
* Print debug to std.out?
*/
protected boolean printToScreen = false;
/**
* Replicate only sessions that have been marked dirty
* false=replicate sessions after each request
*/
protected boolean useDirtyFlag = false;
/**
* Name for logging purpose
*/
protected String clusterImpName = "SimpleTcpCluster";
/**
* The string manager for this package.
*/
protected StringManager sm = StringManager.getManager(Constants.Package);
/**
* The cluster name to join
*/
protected String clusterName = null;
/**
* The Container associated with this Cluster.
*/
protected Container container = null;
/**
* The lifecycle event support for this component.
*/
protected LifecycleSupport lifecycle = new LifecycleSupport(this);
/**
* Has this component been started?
*/
protected boolean started = false;
/**
* The property change support for this component.
*/
protected PropertyChangeSupport support = new PropertyChangeSupport(this);
/**
* The context name <-> manager association for distributed contexts.
*/
protected HashMap managers = new HashMap();
/**
* Sending Stats
*/
private long nrOfMsgsReceived = 0;
private long msgSendTime = 0;
private long lastChecked = System.currentTimeMillis();
//sort members by alive time
protected MemberComparator memberComparator = new MemberComparator();
private String managerClassName = "org.apache.catalina.cluster.session.DeltaManager";
/**
* Sender to send data with
*/
private org.apache.catalina.cluster.ClusterSender clusterSender;
/**
* Receiver to register call back with
*/
private org.apache.catalina.cluster.ClusterReceiver clusterReceiver;
private org.apache.catalina.Valve valve;
private org.apache.catalina.cluster.ClusterDeployer clusterDeployer;
/**
* Listeners of messages
*/
protected Vector clusterListeners = new Vector();
// ------------------------------------------------------------- Properties
public SimpleTcpCluster() {
}
/**
* Return descriptive information about this Cluster implementation and
* the corresponding version number, in the format
* <code>&lt;description&gt;/&lt;version&gt;</code>.
*/
public String getInfo() {
return(info);
}
/**
* Set the name of the cluster to join, if no cluster with
* this name is present create one.
*
* @param clusterName The clustername to join
*/
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
/**
* Return the name of the cluster that this Server is currently
* configured to operate within.
*
* @return The name of the cluster associated with this server
*/
public String getClusterName() {
return clusterName;
}
/**
* Set the Container associated with our Cluster
*
* @param container The Container to use
*/
public void setContainer(Container container) {
Container oldContainer = this.container;
this.container = container;
support.firePropertyChange("container",
oldContainer,
this.container);
//this.container.
}
/**
* Get the Container associated with our Cluster
*
* @return The Container associated with our Cluster
*/
public Container getContainer() {
return(this.container);
}
/**
* Sets the configurable protocol stack. This is a setting in server.xml
* where you can configure your protocol.
*
* @param protocol the protocol stack - this method is called by
* the server configuration at startup
* @see <a href="www.javagroups.com">JavaGroups</a> for details
*/
public void setProtocol(String protocol) {
}
/**
* Returns the protocol.
*/
public String getProtocol() {
return null;
}
public Member[] getMembers() {
Member[] members = membershipService.getMembers();
//sort by alive time
java.util.Arrays.sort(members,memberComparator);
return members;
}
/**
* Return the member that represents this node.
* @return Member
*/
public Member getLocalMember() {
return membershipService.getLocalMember();
}
// --------------------------------------------------------- Public Methods
public synchronized Manager createManager(String name) {
log.debug("Creating ClusterManager for context "+name + " using class "+getManagerClassName());
System.out.println("\n\n\n\nCreating ClusterManager for context "+name + " using class "+getManagerClassName()+"\n\n\n\n");
ClusterManager manager = null;
try {
manager = (ClusterManager)getClass().getClassLoader().loadClass(getManagerClassName()).newInstance();
} catch ( Exception x ) {
log.error("Unable to load class for replication manager",x);
manager = new org.apache.catalina.cluster.session.SimpleTcpReplicationManager();
}
manager.setName(name);
manager.setCluster(this);
manager.setDistributable(true);
manager.setExpireSessionsOnShutdown(expireSessionsOnShutdown);
manager.setUseDirtyFlag(useDirtyFlag);
managers.put(name,manager);
return manager;
}
public void removeManager(String name) {
managers.remove(name);
}
public Manager getManager(String name) {
return (Manager)managers.get(name);
}
// ------------------------------------------------------ Lifecycle Methods
/**
* Add a lifecycle event listener to this component.
*
* @param listener The listener to add
*/
public void addLifecycleListener(LifecycleListener listener) {
lifecycle.addLifecycleListener(listener);
}
/**
* Get the lifecycle listeners associated with this lifecycle. If this
* Lifecycle has no listeners registered, a zero-length array is returned.
*/
public LifecycleListener[] findLifecycleListeners() {
return lifecycle.findLifecycleListeners();
}
/**
* Remove a lifecycle event listener from this component.
*
* @param listener The listener to remove
*/
public void removeLifecycleListener(LifecycleListener listener) {
lifecycle.removeLifecycleListener(listener);
}
/**
* Prepare for the beginning of active use of the public methods of this
* component. This method should be called after <code>configure()</code>,
* and before any of the public methods of the component are utilized.<BR>
* Starts the cluster communication channel, this will connect with the
* other nodes in the cluster, and request the current session state to
* be transferred to this node.
*
* @exception IllegalStateException if this component has already been
* started
* @exception LifecycleException if this component detects a fatal error
* that prevents this component from being used
*/
public void start()
throws LifecycleException {
if (started)
throw new LifecycleException
(sm.getString("cluster.alreadyStarted"));
log.info("Cluster is about to start");
try {
IntrospectionUtils.callMethodN(getContainer(), "addValve", new Object[] {valve}, new Class[] {valve.getClass()});
clusterReceiver.setIsSenderSynchronized(clusterSender.getIsSenderSynchronized());
clusterReceiver.setCatalinaCluster(this);
clusterReceiver.start();
clusterSender.start();
membershipService.setLocalMemberProperties(clusterReceiver.getHost(),clusterReceiver.getPort());
membershipService.addMembershipListener(this);
membershipService.start();
//set the deployer.
try {
if ( clusterDeployer != null ) {
clusterDeployer.setCluster(this);
Object deployer = IntrospectionUtils.getProperty(getContainer(), "deployer");
// FIXME: clusterDeployer.setDeployer( (org.apache.catalina.Deployer) deployer);
clusterDeployer.start();
}
} catch (Throwable x) {
log.fatal("Unable to retrieve the container deployer. Cluster deployment disabled.",x);
} //catch
this.started = true;
} catch ( Exception x ) {
log.error("Unable to start cluster.",x);
throw new LifecycleException(x);
}
}
public void send(ClusterMessage msg, Member dest) {
try
{
msg.setAddress(membershipService.getLocalMember());
Member destination = dest;
if ( msg instanceof SessionMessage ) {
SessionMessage smsg = (SessionMessage) msg;
//if we request session state, send to the oldest of members
if ((destination == null) &&
(smsg.getEventType() == SessionMessage.EVT_GET_ALL_SESSIONS) &&
(membershipService.getMembers().length > 0)) {
destination = membershipService.getMembers()[0];
}//end if
}//end if
msg.setTimestamp(System.currentTimeMillis());
java.io.ByteArrayOutputStream outs = new java.io.ByteArrayOutputStream();
java.io.ObjectOutputStream out = new java.io.ObjectOutputStream(outs);
out.writeObject(msg);
byte[] data = outs.toByteArray();
if(destination != null) {
Member tcpdest = dest;
if ( (tcpdest != null) && (!membershipService.getLocalMember().equals(tcpdest))) {
clusterSender.sendMessage(msg.getUniqueId(), data, tcpdest);
}//end if
}
else {
clusterSender.sendMessage(msg.getUniqueId(),data);
}
} catch ( Exception x ) {
log.error("Unable to send message through cluster sender.",x);
}
}
public void send(ClusterMessage msg) {
send(msg,null);
}
/**
* Gracefully terminate the active use of the public methods of this
* component. This method should be the last one called on a given
* instance of this component.<BR>
* This will disconnect the cluster communication channel and stop
* the listener thread.
*
* @exception IllegalStateException if this component has not been started
* @exception LifecycleException if this component detects a fatal error
* that needs to be reported
*/
public void stop()
throws LifecycleException {
if (!started)
throw new IllegalStateException
(sm.getString("cluster.notStarted"));
membershipService.stop();
membershipService.removeMembershipListener();
try {
clusterSender.stop();
} catch (Exception x ) {
log.error("Unable to stop cluster sender.",x);
}
try {
clusterReceiver.stop();
clusterReceiver.setCatalinaCluster(null);
} catch (Exception x ) {
log.error("Unable to stop cluster receiver.",x);
}
if ( clusterDeployer != null ) {
clusterDeployer.stop();
}
started = false;
}
public void memberAdded(Member member) {
try {
log.info("Replication member added:" + member);
clusterSender.add(member);
} catch ( Exception x ) {
log.error("Unable to connect to replication system.",x);
}
}
public void memberDisappeared(Member member)
{
log.info("Received member disappeared:"+member);
try
{
clusterSender.remove(member);
}
catch ( Exception x )
{
log.error("Unable remove cluster node from replication system.",x);
}
}
public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown){
this.expireSessionsOnShutdown = expireSessionsOnShutdown;
}
public void setPrintToScreen(boolean printToScreen) {
this.printToScreen = printToScreen;
}
public void setUseDirtyFlag(boolean useDirtyFlag) {
this.useDirtyFlag = useDirtyFlag;
}
public void messageDataReceived(byte[] data) {
try {
ReplicationStream stream =
new ReplicationStream(new java.io.ByteArrayInputStream(data),
getClass().getClassLoader());
Object myobj = stream.readObject();
if ( myobj != null && myobj instanceof SessionMessage ) {
SessionMessage msg = (SessionMessage)myobj;
log.debug("Assuming clocks are synched: Replication took="+(System.currentTimeMillis()-msg.getTimestamp())+" ms.");
String ctxname = msg.getContextName();
//check if the message is a EVT_GET_ALL_SESSIONS,
//if so, wait until we are fully started up
if ( ctxname == null ) {
java.util.Iterator i = managers.keySet().iterator();
while ( i.hasNext() ) {
String key = (String)i.next();
ClusterManager mgr = (ClusterManager) managers.get(key);
if (mgr != null)
mgr.messageDataReceived(msg);
else {
//this happens a lot before the system has started up
log.debug("Context manager doesn't exist:" + key);
}
}//while
} else {
ClusterManager mgr = (ClusterManager) managers.get(ctxname);
if (mgr != null)
mgr.messageDataReceived(msg);
else
log.warn("Context manager doesn't exist:" + ctxname);
}//end if
} else {
//invoke all the listeners
for ( int i=0; i<clusterListeners.size(); i++ ) {
MessageListener listener = (MessageListener)clusterListeners.elementAt(i);
if ( myobj!=null &&
myobj instanceof ClusterMessage &&
listener.accept((ClusterMessage)myobj) ) {
listener.messageReceived((ClusterMessage)myobj);
}//end if
}//for
}//end if
} catch ( Exception x ) {
log.error("Unable to deserialize session message.",x);
}
}
public void lifecycleEvent(LifecycleEvent lifecycleEvent){
log.debug("\nlifecycleEvent\n\nType"+lifecycleEvent.getType()+
"\nData"+lifecycleEvent.getData()+
"\n\n\n");
}
// --------------------------------------------------------- Cluster Wide Deployments
/**
* Start an existing web application, attached to the specified context
* path in all the other nodes in the cluster.
* Only starts a web application if it is not running.
*
* @param contextPath The context path of the application to be started
*
* @exception IllegalArgumentException if the specified context path
* is malformed (it must be "" or start with a slash)
* @exception IllegalArgumentException if the specified context path does
* not identify a currently installed web application
* @exception IOException if an input/output error occurs during
* startup
*/
public void startContext(String contextPath) throws IOException {
return;
}
/**
* Install a new web application, whose web application archive is at the
* specified URL, into this container with the specified context path.
* A context path of "" (the empty string) should be used for the root
* application for this container. Otherwise, the context path must
* start with a slash.
* <p>
* If this application is successfully installed, a ContainerEvent of type
* <code>PRE_INSTALL_EVENT</code> will be sent to registered listeners
* before the associated Context is started, and a ContainerEvent of type
* <code>INSTALL_EVENT</code> will be sent to all registered listeners
* after the associated Context is started, with the newly created
* <code>Context</code> as an argument.
*
* @param contextPath The context path to which this application should
* be installed (must be unique)
* @param war A URL of type "jar:" that points to a WAR file, or type
* "file:" that points to an unpacked directory structure containing
* the web application to be installed
*
* @exception IllegalArgumentException if the specified context path
* is malformed (it must be "" or start with a slash)
* @exception IllegalStateException if the specified context path
* is already attached to an existing web application
*/
public void installContext(String contextPath, URL war) {
log.debug("\n\n\n\nCluster Install called for context:"+contextPath+"\n\n\n\n");
}
/**
* Stop an existing web application, attached to the specified context
* path. Only stops a web application if it is running.
*
* @param contextPath The context path of the application to be stopped
*
* @exception IllegalArgumentException if the specified context path
* is malformed (it must be "" or start with a slash)
* @exception IllegalArgumentException if the specified context path does
* not identify a currently installed web application
* @exception IOException if an input/output error occurs while stopping
* the web application
*/
public void stop(String contextPath) throws IOException {
return;
}
public Log getLogger() {
return log;
}
// --------------------------------------------- Inner Class
// --------------------------------------------- Performance
private void perfMessageRecvd(long timeSent) {
nrOfMsgsReceived++;
msgSendTime+=(System.currentTimeMillis()-timeSent);
if ( (System.currentTimeMillis() - lastChecked) > 5000 ) {
log.debug("Calc msg send time total="+msgSendTime+"ms num request="+nrOfMsgsReceived+" average per msg="+(msgSendTime/nrOfMsgsReceived)+"ms.");
}
}
public String getManagerClassName() {
return managerClassName;
}
public void setManagerClassName(String managerClassName) {
this.managerClassName = managerClassName;
}
public org.apache.catalina.cluster.ClusterSender getClusterSender() {
return clusterSender;
}
public void setClusterSender(org.apache.catalina.cluster.ClusterSender clusterSender) {
this.clusterSender = clusterSender;
}
public org.apache.catalina.cluster.ClusterReceiver getClusterReceiver() {
return clusterReceiver;
}
public void setClusterReceiver(org.apache.catalina.cluster.ClusterReceiver clusterReceiver) {
this.clusterReceiver = clusterReceiver;
}
public MembershipService getMembershipService() {
return membershipService;
}
public void setMembershipService(MembershipService membershipService) {
this.membershipService = membershipService;
}
public void addValve(Valve valve) {
this.valve = valve;
}
public void addClusterListener(MessageListener listener) {
if ( !clusterListeners.contains(listener) ) {
clusterListeners.addElement(listener);
}
}
public void removeClusterListener(MessageListener listener) {
clusterListeners.removeElement(listener);
}
public org.apache.catalina.cluster.ClusterDeployer getClusterDeployer() {
return clusterDeployer;
}
public void setClusterDeployer(org.apache.catalina.cluster.ClusterDeployer clusterDeployer) {
this.clusterDeployer = clusterDeployer;
}
private class MemberComparator implements java.util.Comparator {
public int compare(Object o1, Object o2) {
try {
return compare((Member)o1,(Member)o2);
} catch (ClassCastException x) {
return 0;
}
}
public int compare(Member m1, Member m2) {
//longer alive time, means sort first
long result = m2.getMemberAliveTime() - m1.getMemberAliveTime();
if ( result < 0 ) return -1;
else if ( result == 0 ) return 0;
else return 1;
}
}
}