blob: 8614c61fd67a388a08b3ae80171a3b58b9b0b147 [file] [log] [blame]
/*
* Copyright 1999,2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.cluster.session;
import java.io.IOException;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.Session;
import org.apache.catalina.cluster.CatalinaCluster;
import org.apache.catalina.cluster.ClusterMessage;
import org.apache.catalina.cluster.Member;
import org.apache.catalina.realm.GenericPrincipal;
/**
* Title: Tomcat Session Replication for Tomcat 4.0 <BR>
* Description: A very simple straight forward implementation of
* session replication of servers in a cluster.<BR>
* This session replication is implemented "live". By live
* I mean, when a session attribute is added into a session on Node A
* a message is broadcasted to other messages and setAttribute is called on the
* replicated sessions.<BR>
* A full description of this implementation can be found under
* <href="http://www.filip.net/tomcat/">Filip's Tomcat Page</a><BR>
*
* Copyright: See apache license
* Company: www.filip.net
* @author <a href="mailto:mail@filip.net">Filip Hanik</a>
* @author Bela Ban (modifications for synchronous replication)
* @version 1.0 for TC 4.0
* Description: The InMemoryReplicationManager is a session manager that replicated
* session information in memory. It uses <a href="www.javagroups.com">JavaGroups</a> as
* a communication protocol to ensure guaranteed and ordered message delivery.
* JavaGroups also provides a very flexible protocol stack to ensure that the replication
* can be used in any environment.
* <BR><BR>
* The InMemoryReplicationManager extends the StandardManager hence it allows for us
* to inherit all the basic session management features like expiration, session listeners etc
* <BR><BR>
* To communicate with other nodes in the cluster, the InMemoryReplicationManager sends out 7 different type of multicast messages
* all defined in the SessionMessage class.<BR>
* When a session is replicated (not an attribute added/removed) the session is serialized into
* a byte array using the StandardSession.readObjectData, StandardSession.writeObjectData methods.
*/
public class SimpleTcpReplicationManager extends org.apache.catalina.session.StandardManager
implements org.apache.catalina.cluster.ClusterManager
{
public static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog( SimpleTcpReplicationManager.class );
//the channel configuration
protected String mChannelConfig = null;
//the group name
protected String mGroupName = "TomcatReplication";
//somehow start() gets called more than once
protected boolean mChannelStarted = false;
//log to screen
protected boolean mPrintToScreen = true;
protected boolean mManagerRunning = false;
/** Use synchronous rather than asynchronous replication. Every session modification (creation, change, removal etc)
* will be sent to all members. The call will then wait for max milliseconds, or forever (if timeout is 0) for
* all responses.
*/
protected boolean synchronousReplication=true;
/** Set to true if we don't want the sessions to expire on shutdown */
protected boolean mExpireSessionsOnShutdown = true;
protected boolean useDirtyFlag = false;
protected String name;
protected boolean distributable = true;
protected CatalinaCluster cluster;
protected java.util.HashMap invalidatedSessions = new java.util.HashMap();
/**
* Flag to keep track if the state has been transferred or not
* Assumes false.
*/
protected boolean stateTransferred = false;
/**
* Constructor, just calls super()
*
*/
public SimpleTcpReplicationManager()
{
super();
}
public boolean isManagerRunning()
{
return mManagerRunning;
}
public void setUseDirtyFlag(boolean usedirtyflag)
{
this.useDirtyFlag = usedirtyflag;
}
public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown)
{
mExpireSessionsOnShutdown = expireSessionsOnShutdown;
}
public void setCluster(CatalinaCluster cluster) {
log.debug("Cluster associated with SimpleTcpReplicationManager");
this.cluster = cluster;
}
public boolean getExpireSessionsOnShutdown()
{
return mExpireSessionsOnShutdown;
}
public void setPrintToScreen(boolean printtoscreen)
{
log.debug("Setting screen debug to:"+printtoscreen);
mPrintToScreen = printtoscreen;
}
public void setSynchronousReplication(boolean flag)
{
synchronousReplication=flag;
}
/**
* Override persistence since they don't go hand in hand with replication for now.
*/
public void unload() throws IOException {
if ( !getDistributable() ) {
super.unload();
}
}
/**
* Creates a HTTP session.
* Most of the code in here is copied from the StandardManager.
* This is not pretty, yeah I know, but it was necessary since the
* StandardManager had hard coded the session instantiation to the a
* StandardSession, when we actually want to instantiate a ReplicatedSession<BR>
* If the call comes from the Tomcat servlet engine, a SessionMessage goes out to the other
* nodes in the cluster that this session has been created.
* @param notify - if set to true the other nodes in the cluster will be notified.
* This flag is needed so that we can create a session before we deserialize
* a replicated one
*
* @see ReplicatedSession
*/
protected Session createSession(boolean notify, boolean setId)
{
//inherited from the basic manager
if ((getMaxActiveSessions() >= 0) &&
(sessions.size() >= getMaxActiveSessions()))
throw new IllegalStateException(sm.getString("standardManager.createSession.ise"));
Session session = new ReplicatedSession(this);
// Initialize the properties of the new session and return it
session.setNew(true);
session.setValid(true);
session.setCreationTime(System.currentTimeMillis());
session.setMaxInactiveInterval(this.maxInactiveInterval);
String sessionId = generateSessionId();
String jvmRoute = getJvmRoute();
// @todo Move appending of jvmRoute generateSessionId()???
if (jvmRoute != null) {
sessionId += '.' + jvmRoute;
}
if ( setId ) session.setId(sessionId);
if ( notify && (cluster!=null) ) {
((ReplicatedSession)session).setIsDirty(true);
}
return (session);
}//createSession
//=========================================================================
// OVERRIDE THESE METHODS TO IMPLEMENT THE REPLICATION
//=========================================================================
/**
* Construct and return a new session object, based on the default
* settings specified by this Manager's properties. The session
* id will be assigned by this method, and available via the getId()
* method of the returned session. If a new session cannot be created
* for any reason, return <code>null</code>.
*
* @exception IllegalStateException if a new session cannot be
* instantiated for any reason
*/
public Session createSession()
{
//create a session and notify the other nodes in the cluster
Session session = createSession(getDistributable(),true);
add(session);
return session;
}
public void sessionInvalidated(String sessionId) {
synchronized ( invalidatedSessions ) {
invalidatedSessions.put(sessionId, sessionId);
}
}
public String[] getInvalidatedSessions() {
synchronized ( invalidatedSessions ) {
String[] result = new String[invalidatedSessions.size()];
invalidatedSessions.values().toArray(result);
return result;
}
}
public ClusterMessage requestCompleted(String sessionId)
{
if ( !getDistributable() ) {
log.warn("Received requestCompleted message, although this context["+
getName()+"] is not distributable. Ignoring message");
return null;
}
//notify javagroups
try
{
if ( invalidatedSessions.get(sessionId) != null ) {
synchronized ( invalidatedSessions ) {
invalidatedSessions.remove(sessionId);
SessionMessage msg = new SessionMessageImpl(name,
SessionMessage.EVT_SESSION_EXPIRED,
null,
sessionId,
sessionId);
return msg;
}
} else {
ReplicatedSession session = (ReplicatedSession) findSession(
sessionId);
if (session != null) {
//return immediately if the session is not dirty
if (useDirtyFlag && (!session.isDirty())) {
//but before we return doing nothing,
//see if we should send
//an updated last access message so that
//sessions across cluster dont expire
long interval = session.getMaxInactiveInterval();
long lastaccdist = System.currentTimeMillis() -
session.getLastAccessWasDistributed();
if ( ((interval*1000) / lastaccdist)< 3 ) {
SessionMessage accmsg = new SessionMessageImpl(name,
SessionMessage.EVT_SESSION_ACCESSED,
null,
sessionId,
sessionId);
session.setLastAccessWasDistributed(System.currentTimeMillis());
return accmsg;
}
return null;
}
session.setIsDirty(false);
if (log.isDebugEnabled()) {
try {
log.debug("Sending session to cluster=" + session);
}
catch (Exception ignore) {}
}
SessionMessage msg = new SessionMessageImpl(name,
SessionMessage.EVT_SESSION_CREATED,
writeSession(session),
session.getId(),
session.getId());
return msg;
} //end if
}//end if
}
catch (Exception x )
{
log.error("Unable to replicate session",x);
}
return null;
}
/**
* Serialize a session into a byte array<BR>
* This method simple calls the writeObjectData method on the session
* and returns the byte data from that call
* @param session - the session to be serialized
* @return a byte array containing the session data, null if the serialization failed
*/
protected byte[] writeSession( Session session )
{
try
{
java.io.ByteArrayOutputStream session_data = new java.io.ByteArrayOutputStream();
java.io.ObjectOutputStream session_out = new java.io.ObjectOutputStream(session_data);
session_out.flush();
boolean hasPrincipal = session.getPrincipal() != null;
session_out.writeBoolean(hasPrincipal);
if ( hasPrincipal )
{
session_out.writeObject(SerializablePrincipal.createPrincipal((GenericPrincipal)session.getPrincipal()));
}//end if
((ReplicatedSession)session).writeObjectData(session_out);
return session_data.toByteArray();
}
catch ( Exception x )
{
log.error("Failed to serialize the session!",x);
}
return null;
}
/**
* Reinstantiates a serialized session from the data passed in.
* This will first call createSession() so that we get a fresh instance with all
* the managers set and all the transient fields validated.
* Then it calls Session.readObjectData(byte[]) to deserialize the object
* @param data - a byte array containing session data
* @return a valid Session object, null if an error occurs
*
*/
protected Session readSession( byte[] data, String sessionId )
{
try
{
java.io.ByteArrayInputStream session_data = new java.io.ByteArrayInputStream(data);
ReplicationStream session_in = new ReplicationStream(session_data,container.getLoader().getClassLoader());
Session session = sessionId!=null?this.findSession(sessionId):null;
boolean isNew = (session==null);
//clear the old values from the existing session
if ( session!=null ) {
ReplicatedSession rs = (ReplicatedSession)session;
rs.expire(false); //cleans up the previous values, since we are not doing removes
session = null;
}//end if
if (session==null) {
session = createSession(false, false);
sessions.remove(session.getId());
}
boolean hasPrincipal = session_in.readBoolean();
SerializablePrincipal p = null;
if ( hasPrincipal )
p = (SerializablePrincipal)session_in.readObject();
((ReplicatedSession)session).readObjectData(session_in);
if ( hasPrincipal )
session.setPrincipal(p.getPrincipal(getContainer().getRealm()));
((ReplicatedSession)session).setId(sessionId,isNew);
ReplicatedSession rsession = (ReplicatedSession)session;
rsession.setAccessCount(1);
session.setManager(this);
session.setValid(true);
rsession.setLastAccessedTime(System.currentTimeMillis());
rsession.setThisAccessedTime(System.currentTimeMillis());
((ReplicatedSession)session).setAccessCount(0);
session.setNew(false);
// System.out.println("Session loaded id="+sessionId +
// " actualId="+session.getId()+
// " exists="+this.sessions.containsKey(sessionId)+
// " valid="+rsession.isValid());
return session;
}
catch ( Exception x )
{
log.error("Failed to deserialize the session!",x);
}
return null;
}
public String getName() {
return this.name;
}
/**
* Prepare for the beginning of active use of the public methods of this
* component. This method should be called after <code>configure()</code>,
* and before any of the public methods of the component are utilized.<BR>
* Starts the cluster communication channel, this will connect with the other nodes
* in the cluster, and request the current session state to be transferred to this node.
* @exception IllegalStateException if this component has already been
* started
* @exception LifecycleException if this component detects a fatal error
* that prevents this component from being used
*/
public void start() throws LifecycleException {
mManagerRunning = true;
super.start();
//start the javagroups channel
try {
//the channel is already running
if ( mChannelStarted ) return;
log.error("Starting clustering manager...:"+getName());
if ( cluster == null ) {
log.error("Starting... no cluster associated with this context:"+getName());
return;
}
if (cluster.getMembers().length > 0) {
Member mbr = cluster.getMembers()[0];
SessionMessage msg =
new SessionMessageImpl(this.getName(),
SessionMessage.EVT_GET_ALL_SESSIONS,
null,
"GET-ALL",
"GET-ALL-"+this.getName());
cluster.send(msg, mbr);
log.warn("Manager["+getName()+"], requesting session state from "+mbr+
". This operation will timeout if no session state has been received within "+
"60 seconds");
long reqStart = System.currentTimeMillis();
long reqNow = 0;
boolean isTimeout=false;
do {
try {
Thread.sleep(100);
}catch ( Exception sleep) {}
reqNow = System.currentTimeMillis();
isTimeout=((reqNow-reqStart)>(1000*60));
} while ( (!isStateTransferred()) && (!isTimeout));
if ( isTimeout || (!isStateTransferred()) ) {
log.error("Manager["+getName()+"], No session state received, timing out.");
}else {
log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms.");
}
} else {
log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group.");
}//end if
mChannelStarted = true;
} catch ( Exception x ) {
log.error("Unable to start SimpleTcpReplicationManager",x);
}
}
/**
* Gracefully terminate the active use of the public methods of this
* component. This method should be the last one called on a given
* instance of this component.<BR>
* This will disconnect the cluster communication channel and stop the listener thread.
* @exception IllegalStateException if this component has not been started
* @exception LifecycleException if this component detects a fatal error
* that needs to be reported
*/
public void stop() throws LifecycleException
{
mManagerRunning = false;
mChannelStarted = false;
super.stop();
//stop the javagroup channel
try
{
cluster.removeManager(getName());
// mReplicationListener.stopListening();
// mReplicationTransmitter.stop();
// service.stop();
// service = null;
}
catch ( Exception x )
{
log.error("Unable to stop SimpleTcpReplicationManager",x);
}
}
public void setDistributable(boolean dist) {
this.distributable = dist;
}
public boolean getDistributable() {
return distributable;
}
/**
* This method is called by the received thread when a SessionMessage has
* been received from one of the other nodes in the cluster.
* @param msg - the message received
* @param sender - the sender of the message, this is used if we receive a
* EVT_GET_ALL_SESSION message, so that we only reply to
* the requesting node
*/
protected void messageReceived( SessionMessage msg, Member sender ) {
try {
log.debug("Received SessionMessage of type="+msg.getEventTypeString());
log.debug("Received SessionMessage sender="+sender);
switch ( msg.getEventType() ) {
case SessionMessage.EVT_GET_ALL_SESSIONS: {
//get a list of all the session from this manager
Object[] sessions = findSessions();
java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream();
java.io.ObjectOutputStream oout = new java.io.ObjectOutputStream(bout);
oout.writeInt(sessions.length);
for (int i=0; i<sessions.length; i++){
ReplicatedSession ses = (ReplicatedSession)sessions[i];
oout.writeUTF(ses.getId());
byte[] data = writeSession(ses);
oout.writeObject(data);
}//for
//don't send a message if we don't have to
oout.flush();
oout.close();
byte[] data = bout.toByteArray();
SessionMessage newmsg = new SessionMessageImpl(name,
SessionMessage.EVT_ALL_SESSION_DATA,
data, "SESSION-STATE","SESSION-STATE-"+getName());
cluster.send(newmsg, sender);
break;
}
case SessionMessage.EVT_ALL_SESSION_DATA: {
java.io.ByteArrayInputStream bin =
new java.io.ByteArrayInputStream(msg.getSession());
java.io.ObjectInputStream oin = new java.io.ObjectInputStream(bin);
int size = oin.readInt();
for ( int i=0; i<size; i++) {
String id = oin.readUTF();
byte[] data = (byte[])oin.readObject();
Session session = readSession(data,id);
}//for
stateTransferred=true;
break;
}
case SessionMessage.EVT_SESSION_CREATED: {
Session session = this.readSession(msg.getSession(),msg.getSessionID());
if ( log.isDebugEnabled() ) {
log.debug("Received replicated session=" + session +
" isValid=" + session.isValid());
}
break;
}
case SessionMessage.EVT_SESSION_EXPIRED: {
Session session = findSession(msg.getSessionID());
if ( session != null ) {
session.expire();
this.remove(session);
}//end if
break;
}
case SessionMessage.EVT_SESSION_ACCESSED :{
Session session = findSession(msg.getSessionID());
if ( session != null ) {
session.access();
session.endAccess();
}
break;
}
default: {
//we didn't recognize the message type, do nothing
break;
}
}//switch
}
catch ( Exception x )
{
log.error("Unable to receive message through TCP channel",x);
}
}
public void messageDataReceived(ClusterMessage cmsg) {
try {
if ( cmsg instanceof SessionMessage ) {
SessionMessage msg = (SessionMessage)cmsg;
messageReceived(msg,
msg.getAddress() != null ? (Member) msg.getAddress() : null);
}
} catch(Throwable ex){
log.error("InMemoryReplicationManager.messageDataReceived()", ex);
}//catch
}
public boolean isStateTransferred() {
return stateTransferred;
}
public void setName(String name) {
this.name = name;
}
}