| /* |
| * Copyright 1999,2004-2006 The Apache Software Foundation. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.catalina.tribes.group; |
| |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| |
| import org.apache.catalina.tribes.ByteMessage; |
| import org.apache.catalina.tribes.Channel; |
| import org.apache.catalina.tribes.ChannelException; |
| import org.apache.catalina.tribes.ChannelInterceptor; |
| import org.apache.catalina.tribes.ChannelListener; |
| import org.apache.catalina.tribes.ChannelMessage; |
| import org.apache.catalina.tribes.ChannelReceiver; |
| import org.apache.catalina.tribes.ChannelSender; |
| import org.apache.catalina.tribes.ErrorHandler; |
| import org.apache.catalina.tribes.ManagedChannel; |
| import org.apache.catalina.tribes.Member; |
| import org.apache.catalina.tribes.MembershipListener; |
| import org.apache.catalina.tribes.MembershipService; |
| import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor; |
| import org.apache.catalina.tribes.io.ChannelData; |
| import org.apache.catalina.tribes.io.XByteBuffer; |
| import org.apache.catalina.tribes.UniqueId; |
| import org.apache.catalina.tribes.Heartbeat; |
| import org.apache.catalina.tribes.io.BufferPool; |
| import java.io.IOException; |
| import org.apache.catalina.tribes.RemoteProcessException; |
| import org.apache.catalina.tribes.util.Logs; |
| import org.apache.catalina.tribes.util.Arrays; |
| |
| /** |
| * The default implementation of a Channel.<br> |
| * The GroupChannel manages the replication channel. It coordinates |
| * message being sent and received with membership announcements. |
| * The channel has an chain of interceptors that can modify the message or perform other logic.<br> |
| * It manages a complete group, both membership and replication. |
| * @author Filip Hanik |
| * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ |
| */ |
| public class GroupChannel extends ChannelInterceptorBase implements ManagedChannel { |
| /** |
| * Flag to determine if the channel manages its own heartbeat |
| * If set to true, the channel will start a local thread for the heart beat. |
| */ |
| protected boolean heartbeat = true; |
| /** |
| * If <code>heartbeat == true</code> then how often do we want this |
| * heartbeat to run. default is one minute |
| */ |
| protected long heartbeatSleeptime = 5*1000;//every 5 seconds |
| |
| /** |
| * Internal heartbeat thread |
| */ |
| protected HeartbeatThread hbthread = null; |
| |
| /** |
| * The <code>ChannelCoordinator</code> coordinates the bottom layer components:<br> |
| * - MembershipService<br> |
| * - ChannelSender <br> |
| * - ChannelReceiver<br> |
| */ |
| protected ChannelCoordinator coordinator = new ChannelCoordinator(); |
| |
| /** |
| * The first interceptor in the inteceptor stack. |
| * The interceptors are chained in a linked list, so we only need a reference to the |
| * first one |
| */ |
| protected ChannelInterceptor interceptors = null; |
| |
| /** |
| * A list of membership listeners that subscribe to membership announcements |
| */ |
| protected ArrayList membershipListeners = new ArrayList(); |
| |
| /** |
| * A list of channel listeners that subscribe to incoming messages |
| */ |
| protected ArrayList channelListeners = new ArrayList(); |
| |
| /** |
| * If set to true, the GroupChannel will check to make sure that |
| */ |
| protected boolean optionCheck = false; |
| |
| /** |
| * Creates a GroupChannel. This constructor will also |
| * add the first interceptor in the GroupChannel.<br> |
| * The first interceptor is always the channel itself. |
| */ |
| public GroupChannel() { |
| addInterceptor(this); |
| } |
| |
| |
| /** |
| * Adds an interceptor to the stack for message processing<br> |
| * Interceptors are ordered in the way they are added.<br> |
| * <code>channel.addInterceptor(A);</code><br> |
| * <code>channel.addInterceptor(C);</code><br> |
| * <code>channel.addInterceptor(B);</code><br> |
| * Will result in a interceptor stack like this:<br> |
| * <code>A -> C -> B</code><br> |
| * The complete stack will look like this:<br> |
| * <code>Channel -> A -> C -> B -> ChannelCoordinator</code><br> |
| * @param interceptor ChannelInterceptorBase |
| */ |
| public void addInterceptor(ChannelInterceptor interceptor) { |
| if ( interceptors == null ) { |
| interceptors = interceptor; |
| interceptors.setNext(coordinator); |
| interceptors.setPrevious(null); |
| coordinator.setPrevious(interceptors); |
| } else { |
| ChannelInterceptor last = interceptors; |
| while ( last.getNext() != coordinator ) { |
| last = last.getNext(); |
| } |
| last.setNext(interceptor); |
| interceptor.setNext(coordinator); |
| interceptor.setPrevious(last); |
| coordinator.setPrevious(interceptor); |
| } |
| } |
| |
| /** |
| * Sends a heartbeat through the interceptor stack.<br> |
| * Invoke this method from the application on a periodic basis if |
| * you have turned off internal heartbeats <code>channel.setHeartbeat(false)</code> |
| */ |
| public void heartbeat() { |
| super.heartbeat(); |
| Iterator i = membershipListeners.iterator(); |
| while ( i.hasNext() ) { |
| Object o = i.next(); |
| if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat(); |
| } |
| i = channelListeners.iterator(); |
| while ( i.hasNext() ) { |
| Object o = i.next(); |
| if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat(); |
| } |
| |
| } |
| |
| |
| /** |
| * Send a message to the destinations specified |
| * @param destination Member[] - destination.length > 1 |
| * @param msg Serializable - the message to send |
| * @param options int - sender options, options can trigger guarantee levels and different interceptors to |
| * react to the message see class documentation for the <code>Channel</code> object.<br> |
| * @return UniqueId - the unique Id that was assigned to this message |
| * @throws ChannelException - if an error occurs processing the message |
| * @see org.apache.catalina.tribes.Channel |
| */ |
| public UniqueId send(Member[] destination, Serializable msg, int options) throws ChannelException { |
| return send(destination,msg,options,null); |
| } |
| |
| /** |
| * |
| * @param destination Member[] - destination.length > 1 |
| * @param msg Serializable - the message to send |
| * @param options int - sender options, options can trigger guarantee levels and different interceptors to |
| * react to the message see class documentation for the <code>Channel</code> object.<br> |
| * @param handler - callback object for error handling and completion notification, used when a message is |
| * sent asynchronously using the <code>Channel.SEND_OPTIONS_ASYNCHRONOUS</code> flag enabled. |
| * @return UniqueId - the unique Id that was assigned to this message |
| * @throws ChannelException - if an error occurs processing the message |
| * @see org.apache.catalina.tribes.Channel |
| */ |
| public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException { |
| if ( msg == null ) throw new ChannelException("Cant send a NULL message"); |
| XByteBuffer buffer = null; |
| try { |
| if ( destination == null || destination.length == 0) throw new ChannelException("No destination given"); |
| ChannelData data = new ChannelData(true);//generates a unique Id |
| data.setAddress(getLocalMember(false)); |
| data.setTimestamp(System.currentTimeMillis()); |
| byte[] b = null; |
| if ( msg instanceof ByteMessage ){ |
| b = ((ByteMessage)msg).getMessage(); |
| options = options | SEND_OPTIONS_BYTE_MESSAGE; |
| } else { |
| b = XByteBuffer.serialize(msg); |
| options = options & (~SEND_OPTIONS_BYTE_MESSAGE); |
| } |
| data.setOptions(options); |
| //XByteBuffer buffer = new XByteBuffer(b.length+128,false); |
| buffer = BufferPool.getBufferPool().getBuffer(b.length+128, false); |
| buffer.append(b,0,b.length); |
| data.setMessage(buffer); |
| InterceptorPayload payload = null; |
| if ( handler != null ) { |
| payload = new InterceptorPayload(); |
| payload.setErrorHandler(handler); |
| } |
| getFirstInterceptor().sendMessage(destination, data, payload); |
| if ( Logs.MESSAGES.isTraceEnabled() ) { |
| Logs.MESSAGES.trace("GroupChannel - Sent msg:" + new UniqueId(data.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination)); |
| Logs.MESSAGES.trace("GroupChannel - Send Message:" + new UniqueId(data.getUniqueId()) + " is " +msg); |
| } |
| |
| return new UniqueId(data.getUniqueId()); |
| }catch ( Exception x ) { |
| if ( x instanceof ChannelException ) throw (ChannelException)x; |
| throw new ChannelException(x); |
| } finally { |
| if ( buffer != null ) BufferPool.getBufferPool().returnBuffer(buffer); |
| } |
| } |
| |
| |
| /** |
| * Callback from the interceptor stack. <br> |
| * When a message is received from a remote node, this method will be invoked by |
| * the previous interceptor.<br> |
| * This method can also be used to send a message to other components within the same application, |
| * but its an extreme case, and you're probably better off doing that logic between the applications itself. |
| * @param msg ChannelMessage |
| */ |
| public void messageReceived(ChannelMessage msg) { |
| if ( msg == null ) return; |
| try { |
| if ( Logs.MESSAGES.isTraceEnabled() ) { |
| Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " from "+msg.getAddress().getName()); |
| } |
| |
| Serializable fwd = null; |
| if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) { |
| fwd = new ByteMessage(msg.getMessage().getBytes()); |
| } else { |
| fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(),0,msg.getMessage().getLength()); |
| } |
| if ( Logs.MESSAGES.isTraceEnabled() ) { |
| Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd); |
| } |
| |
| //get the actual member with the correct alive time |
| Member source = msg.getAddress(); |
| boolean rx = false; |
| boolean delivered = false; |
| for ( int i=0; i<channelListeners.size(); i++ ) { |
| ChannelListener channelListener = (ChannelListener)channelListeners.get(i); |
| if (channelListener != null && channelListener.accept(fwd, source)) { |
| channelListener.messageReceived(fwd, source); |
| delivered = true; |
| //if the message was accepted by an RPC channel, that channel |
| //is responsible for returning the reply, otherwise we send an absence reply |
| if ( channelListener instanceof RpcChannel ) rx = true; |
| } |
| }//for |
| if ((!rx) && (fwd instanceof RpcMessage)) { |
| //if we have a message that requires a response, |
| //but none was given, send back an immediate one |
| sendNoRpcChannelReply((RpcMessage)fwd,source); |
| } |
| if ( Logs.MESSAGES.isTraceEnabled() ) { |
| Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId())); |
| } |
| |
| } catch ( Exception x ) { |
| if ( log.isDebugEnabled() ) log.error("Unable to process channel:IOException.",x); |
| throw new RemoteProcessException("IOException:"+x.getMessage(),x); |
| } |
| } |
| |
| /** |
| * Sends a <code>NoRpcChannelReply</code> message to a member<br> |
| * This method gets invoked by the channel if a RPC message comes in |
| * and no channel listener accepts the message. This avoids timeout |
| * @param msg RpcMessage |
| * @param destination Member - the destination for the reply |
| */ |
| protected void sendNoRpcChannelReply(RpcMessage msg, Member destination) { |
| try { |
| //avoid circular loop |
| if ( msg instanceof RpcMessage.NoRpcChannelReply) return; |
| RpcMessage.NoRpcChannelReply reply = new RpcMessage.NoRpcChannelReply(msg.rpcId,msg.uuid); |
| send(new Member[]{destination},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS); |
| } catch ( Exception x ) { |
| log.error("Unable to find rpc channel, failed to send NoRpcChannelReply.",x); |
| } |
| } |
| |
| /** |
| * memberAdded gets invoked by the interceptor below the channel |
| * and the channel will broadcast it to the membership listeners |
| * @param member Member - the new member |
| */ |
| public void memberAdded(Member member) { |
| //notify upwards |
| for (int i=0; i<membershipListeners.size(); i++ ) { |
| MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i); |
| if (membershipListener != null) membershipListener.memberAdded(member); |
| } |
| } |
| |
| /** |
| * memberDisappeared gets invoked by the interceptor below the channel |
| * and the channel will broadcast it to the membership listeners |
| * @param member Member - the member that left or crashed |
| */ |
| public void memberDisappeared(Member member) { |
| //notify upwards |
| for (int i=0; i<membershipListeners.size(); i++ ) { |
| MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i); |
| if (membershipListener != null) membershipListener.memberDisappeared(member); |
| } |
| } |
| |
| /** |
| * Sets up the default implementation interceptor stack |
| * if no interceptors have been added |
| * @throws ChannelException |
| */ |
| protected synchronized void setupDefaultStack() throws ChannelException { |
| |
| if ( getFirstInterceptor() != null && |
| ((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) { |
| ChannelInterceptor interceptor = null; |
| Class clazz = null; |
| try { |
| clazz = Class.forName("org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor", |
| true,GroupChannel.class.getClassLoader()); |
| clazz.newInstance(); |
| } catch ( Throwable x ) { |
| clazz = MessageDispatchInterceptor.class; |
| }//catch |
| try { |
| interceptor = (ChannelInterceptor) clazz.newInstance(); |
| } catch (Exception x) { |
| throw new ChannelException("Unable to add MessageDispatchInterceptor to interceptor chain.",x); |
| } |
| this.addInterceptor(interceptor); |
| } |
| } |
| |
| /** |
| * Validates the option flags that each interceptor is using and reports |
| * an error if two interceptor share the same flag. |
| * @throws ChannelException |
| */ |
| protected void checkOptionFlags() throws ChannelException { |
| StringBuffer conflicts = new StringBuffer(); |
| ChannelInterceptor first = interceptors; |
| while ( first != null ) { |
| int flag = first.getOptionFlag(); |
| if ( flag != 0 ) { |
| ChannelInterceptor next = first.getNext(); |
| while ( next != null ) { |
| int nflag = next.getOptionFlag(); |
| if (nflag!=0 && (((flag & nflag) == flag ) || ((flag & nflag) == nflag)) ) { |
| conflicts.append("["); |
| conflicts.append(first.getClass().getName()); |
| conflicts.append(":"); |
| conflicts.append(flag); |
| conflicts.append(" == "); |
| conflicts.append(next.getClass().getName()); |
| conflicts.append(":"); |
| conflicts.append(nflag); |
| conflicts.append("] "); |
| }//end if |
| next = next.getNext(); |
| }//while |
| }//end if |
| first = first.getNext(); |
| }//while |
| if ( conflicts.length() > 0 ) throw new ChannelException("Interceptor option flag conflict: "+conflicts.toString()); |
| |
| } |
| |
| /** |
| * Starts the channel |
| * @param svc int - what service to start |
| * @throws ChannelException |
| * @see org.apache.catalina.tribes.Channel#start(int) |
| */ |
| public synchronized void start(int svc) throws ChannelException { |
| setupDefaultStack(); |
| if (optionCheck) checkOptionFlags(); |
| super.start(svc); |
| if ( hbthread == null && heartbeat ) { |
| hbthread = new HeartbeatThread(this,heartbeatSleeptime); |
| hbthread.start(); |
| } |
| } |
| |
| /** |
| * Stops the channel |
| * @param svc int |
| * @throws ChannelException |
| * @see org.apache.catalina.tribes.Channel#stop(int) |
| */ |
| public synchronized void stop(int svc) throws ChannelException { |
| if (hbthread != null) { |
| hbthread.stopHeartbeat(); |
| hbthread = null; |
| } |
| super.stop(svc); |
| } |
| |
| /** |
| * Returns the first interceptor of the stack. Useful for traversal. |
| * @return ChannelInterceptor |
| */ |
| public ChannelInterceptor getFirstInterceptor() { |
| if (interceptors != null) return interceptors; |
| else return coordinator; |
| } |
| |
| /** |
| * Returns the channel receiver component |
| * @return ChannelReceiver |
| */ |
| public ChannelReceiver getChannelReceiver() { |
| return coordinator.getClusterReceiver(); |
| } |
| |
| /** |
| * Returns the channel sender component |
| * @return ChannelSender |
| */ |
| public ChannelSender getChannelSender() { |
| return coordinator.getClusterSender(); |
| } |
| |
| /** |
| * Returns the membership service component |
| * @return MembershipService |
| */ |
| public MembershipService getMembershipService() { |
| return coordinator.getMembershipService(); |
| } |
| |
| /** |
| * Sets the channel receiver component |
| * @param clusterReceiver ChannelReceiver |
| */ |
| public void setChannelReceiver(ChannelReceiver clusterReceiver) { |
| coordinator.setClusterReceiver(clusterReceiver); |
| } |
| |
| /** |
| * Sets the channel sender component |
| * @param clusterSender ChannelSender |
| */ |
| public void setChannelSender(ChannelSender clusterSender) { |
| coordinator.setClusterSender(clusterSender); |
| } |
| |
| /** |
| * Sets the membership component |
| * @param membershipService MembershipService |
| */ |
| public void setMembershipService(MembershipService membershipService) { |
| coordinator.setMembershipService(membershipService); |
| } |
| |
| /** |
| * Adds a membership listener to the channel.<br> |
| * Membership listeners are uniquely identified using the equals(Object) method |
| * @param membershipListener MembershipListener |
| */ |
| public void addMembershipListener(MembershipListener membershipListener) { |
| if (!this.membershipListeners.contains(membershipListener) ) |
| this.membershipListeners.add(membershipListener); |
| } |
| |
| /** |
| * Removes a membership listener from the channel.<br> |
| * Membership listeners are uniquely identified using the equals(Object) method |
| * @param membershipListener MembershipListener |
| */ |
| |
| public void removeMembershipListener(MembershipListener membershipListener) { |
| membershipListeners.remove(membershipListener); |
| } |
| |
| /** |
| * Adds a channel listener to the channel.<br> |
| * Channel listeners are uniquely identified using the equals(Object) method |
| * @param channelListener ChannelListener |
| */ |
| public void addChannelListener(ChannelListener channelListener) { |
| if (!this.channelListeners.contains(channelListener) ) { |
| this.channelListeners.add(channelListener); |
| } else { |
| throw new IllegalArgumentException("Listener already exists:"+channelListener); |
| } |
| } |
| |
| /** |
| * |
| * Removes a channel listener from the channel.<br> |
| * Channel listeners are uniquely identified using the equals(Object) method |
| * @param channelListener ChannelListener |
| */ |
| public void removeChannelListener(ChannelListener channelListener) { |
| channelListeners.remove(channelListener); |
| } |
| |
| /** |
| * Returns an iterator of all the interceptors in this stack |
| * @return Iterator |
| */ |
| public Iterator getInterceptors() { |
| return new InterceptorIterator(this.getNext(),this.coordinator); |
| } |
| |
| /** |
| * Enables/disables the option check<br> |
| * Setting this to true, will make the GroupChannel perform a conflict check |
| * on the interceptors. If two interceptors are using the same option flag |
| * and throw an error upon start. |
| * @param optionCheck boolean |
| */ |
| public void setOptionCheck(boolean optionCheck) { |
| this.optionCheck = optionCheck; |
| } |
| |
| /** |
| * Configure local heartbeat sleep time<br> |
| * Only used when <code>getHeartbeat()==true</code> |
| * @param heartbeatSleeptime long - time in milliseconds to sleep between heartbeats |
| */ |
| public void setHeartbeatSleeptime(long heartbeatSleeptime) { |
| this.heartbeatSleeptime = heartbeatSleeptime; |
| } |
| |
| /** |
| * Enables or disables local heartbeat. |
| * if <code>setHeartbeat(true)</code> is invoked then the channel will start an internal |
| * thread to invoke <code>Channel.heartbeat()</code> every <code>getHeartbeatSleeptime</code> milliseconds |
| * @param heartbeat boolean |
| */ |
| public void setHeartbeat(boolean heartbeat) { |
| this.heartbeat = heartbeat; |
| } |
| |
| /** |
| * @see #setOptionCheck(boolean) |
| * @return boolean |
| */ |
| public boolean getOptionCheck() { |
| return optionCheck; |
| } |
| |
| /** |
| * @see #setHeartbeat(boolean) |
| * @return boolean |
| */ |
| public boolean getHeartbeat() { |
| return heartbeat; |
| } |
| |
| /** |
| * Returns the sleep time in milliseconds that the internal heartbeat will |
| * sleep in between invokations of <code>Channel.heartbeat()</code> |
| * @return long |
| */ |
| public long getHeartbeatSleeptime() { |
| return heartbeatSleeptime; |
| } |
| |
| /** |
| * |
| * <p>Title: Interceptor Iterator</p> |
| * |
| * <p>Description: An iterator to loop through the interceptors in a channel</p> |
| * |
| * @version 1.0 |
| */ |
| public static class InterceptorIterator implements Iterator { |
| private ChannelInterceptor end; |
| private ChannelInterceptor start; |
| public InterceptorIterator(ChannelInterceptor start, ChannelInterceptor end) { |
| this.end = end; |
| this.start = start; |
| } |
| |
| public boolean hasNext() { |
| return start!=null && start != end; |
| } |
| |
| public Object next() { |
| Object result = null; |
| if ( hasNext() ) { |
| result = start; |
| start = start.getNext(); |
| } |
| return result; |
| } |
| |
| public void remove() { |
| //empty operation |
| } |
| } |
| |
| /** |
| * |
| * <p>Title: Internal heartbeat thread</p> |
| * |
| * <p>Description: if <code>Channel.getHeartbeat()==true</code> then a thread of this class |
| * is created</p> |
| * |
| * @version 1.0 |
| */ |
| public static class HeartbeatThread extends Thread { |
| protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(HeartbeatThread.class); |
| protected static int counter = 1; |
| protected static synchronized int inc() { |
| return counter++; |
| } |
| |
| protected boolean doRun = true; |
| protected GroupChannel channel; |
| protected long sleepTime; |
| public HeartbeatThread(GroupChannel channel, long sleepTime) { |
| super(); |
| this.setPriority(MIN_PRIORITY); |
| setName("GroupChannel-Heartbeat-"+inc()); |
| setDaemon(true); |
| this.channel = channel; |
| this.sleepTime = sleepTime; |
| } |
| public void stopHeartbeat() { |
| doRun = false; |
| interrupt(); |
| } |
| |
| public void run() { |
| while (doRun) { |
| try { |
| Thread.sleep(sleepTime); |
| channel.heartbeat(); |
| } catch ( InterruptedException x ) { |
| interrupted(); |
| } catch ( Exception x ) { |
| log.error("Unable to send heartbeat through Tribes interceptor stack. Will try to sleep again.",x); |
| }//catch |
| }//while |
| }//run |
| }//HeartbeatThread |
| |
| |
| |
| } |