| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| */ |
| package org.apache.catalina.tribes.group.interceptors; |
| |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.catalina.tribes.Channel; |
| import org.apache.catalina.tribes.ChannelException; |
| import org.apache.catalina.tribes.ChannelInterceptor; |
| import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent; |
| import org.apache.catalina.tribes.ChannelMessage; |
| import org.apache.catalina.tribes.Member; |
| import org.apache.catalina.tribes.UniqueId; |
| import org.apache.catalina.tribes.group.AbsoluteOrder; |
| import org.apache.catalina.tribes.group.ChannelInterceptorBase; |
| import org.apache.catalina.tribes.group.InterceptorPayload; |
| import org.apache.catalina.tribes.io.ChannelData; |
| import org.apache.catalina.tribes.io.XByteBuffer; |
| import org.apache.catalina.tribes.membership.MemberImpl; |
| import org.apache.catalina.tribes.membership.Membership; |
| import org.apache.catalina.tribes.util.Arrays; |
| import org.apache.catalina.tribes.util.UUIDGenerator; |
| |
| /** |
| * <p>Title: Auto merging leader election algorithm</p> |
| * |
| * <p>Description: Implementation of a simple coordinator algorithm that not only selects a coordinator, |
| * it also merges groups automatically when members are discovered that werent part of the |
| * </p> |
| * <p>This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on |
| * </p> |
| * <p>This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership |
| * to pass a token ring of the current membership.<br> |
| * This is not the same as just using AbsoluteOrder! Consider the following scenario:<br> |
| * Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all |
| * nodes are receiving pings from all the other nodes. |
| * meaning, that node{i} receives pings from node{all}-node{i}<br> |
| * but the following could happen if a multicast problem occurs. |
| * A has members {B,C,D}<br> |
| * B has members {A,C}<br> |
| * C has members {D,E}<br> |
| * D has members {A,B,C,E}<br> |
| * E has members {A,C,D}<br> |
| * Because the default Tribes membership implementation, relies on the multicast packets to |
| * arrive at all nodes correctly, there is nothing guaranteeing that it will.<br> |
| * <br> |
| * To best explain how this algorithm works, lets take the above example: |
| * For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work |
| * where messages overlap, as they all depend on absolute order<br> |
| * Scenario 1: A,B,C,D,E all come online at the same time |
| * Eval phase, A thinks of itself as leader, B thinks of A as leader, |
| * C thinks of itself as leader, D,E think of A as leader<br> |
| * Token phase:<br> |
| * (1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)<br> |
| * (1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)<br> |
| * (2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C <br> |
| * (2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E<br> |
| * (3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D<br> |
| * (3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A<br> |
| * (4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A<br> |
| * (4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members<br> |
| * (5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E} <br> |
| * At this point, the state looks like<br> |
| * A - {A-ldr, mbrs-A,B,C,D,E, id=X}<br> |
| * B - {A-ldr, mbrs-A,B,C,D, id=X}<br> |
| * C - {A-ldr, mbrs-A,B,C,D,E, id=X}<br> |
| * D - {A-ldr, mbrs-A,B,C,D,E, id=X}<br> |
| * E - {A-ldr, mbrs-A,B,C,D,E, id=Y}<br> |
| * <br> |
| * A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader. |
| * As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have |
| * arrived at the same membership and all nodes are informed of each other.<br> |
| * To synchronize the rest we simply perform the following check at A when A receives X:<br> |
| * Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}<br> |
| * Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B |
| * When A receives X again, the token is complete. <br> |
| * Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then |
| * install and accept the view. |
| * </p> |
| * <p> |
| * Lets assume that C1 arrives, C1 has lower priority than C, but higher priority than D.<br> |
| * Lets also assume that C1 sees the following view {B,D,E}<br> |
| * C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.<br> |
| * In the scenario where C1 sees {D,E} and A,B,C can not see C1, no token will ever arrive.<br> |
| * In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D<br> |
| * D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E<br> |
| * E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A<br> |
| * A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again. |
| * At that time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,E |
| * </p> |
| * <p>To ensure that the view gets implemented at all nodes at the same time, |
| * A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above. |
| * <p>Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships</p> |
| * |
| * <p>The example above, of course can be simplified with a finite statemachine:<br> |
| * But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.<br> |
| * Maybe I'll do a state diagram :) |
| * </p> |
| * <h2>State Diagrams</h2> |
| * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-initiate-election.jpg">Initiate an election</a><br><br> |
| * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-message-arrives.jpg">Receive an election message</a><br><br> |
| * |
| * @author Filip Hanik |
| * @version 1.0 |
| * |
| * |
| * |
| */ |
| public class NonBlockingCoordinator extends ChannelInterceptorBase { |
| |
| /** |
| * header for a coordination message |
| */ |
| protected static final byte[] COORD_HEADER = new byte[] {-86, 38, -34, -29, -98, 90, 65, 63, -81, -122, -6, -110, 99, -54, 13, 63}; |
| /** |
| * Coordination request |
| */ |
| protected static final byte[] COORD_REQUEST = new byte[] {104, -95, -92, -42, 114, -36, 71, -19, -79, 20, 122, 101, -1, -48, -49, 30}; |
| /** |
| * Coordination confirmation, for blocking installations |
| */ |
| protected static final byte[] COORD_CONF = new byte[] {67, 88, 107, -86, 69, 23, 76, -70, -91, -23, -87, -25, -125, 86, 75, 20}; |
| |
| /** |
| * Alive message |
| */ |
| protected static final byte[] COORD_ALIVE = new byte[] {79, -121, -25, -15, -59, 5, 64, 94, -77, 113, -119, -88, 52, 114, -56, -46, |
| -18, 102, 10, 34, -127, -9, 71, 115, -70, 72, -101, 88, 72, -124, 127, 111, |
| 74, 76, -116, 50, 111, 103, 65, 3, -77, 51, -35, 0, 119, 117, 9, -26, |
| 119, 50, -75, -105, -102, 36, 79, 37, -68, -84, -123, 15, -22, -109, 106, -55}; |
| /** |
| * Time to wait for coordination timeout |
| */ |
| protected long waitForCoordMsgTimeout = 15000; |
| /** |
| * Our current view |
| */ |
| protected Membership view = null; |
| /** |
| * Out current viewId |
| */ |
| protected UniqueId viewId; |
| |
| /** |
| * Our nonblocking membership |
| */ |
| protected Membership membership = null; |
| |
| /** |
| * indicates that we are running an election |
| * and this is the one we are running |
| */ |
| protected UniqueId suggestedviewId; |
| protected Membership suggestedView; |
| |
| protected boolean started = false; |
| protected final int startsvc = 0xFFFF; |
| |
| protected Object electionMutex = new Object(); |
| |
| protected AtomicBoolean coordMsgReceived = new AtomicBoolean(false); |
| |
| public NonBlockingCoordinator() { |
| super(); |
| } |
| |
| //============================================================================================================ |
| // COORDINATION HANDLING |
| //============================================================================================================ |
| |
| public void startElection(boolean force) throws ChannelException { |
| synchronized (electionMutex) { |
| MemberImpl local = (MemberImpl)getLocalMember(false); |
| MemberImpl[] others = (MemberImpl[])membership.getMembers(); |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election initated")); |
| if ( others.length == 0 ) { |
| this.viewId = new UniqueId(UUIDGenerator.randomUUID(false)); |
| this.view = new Membership(local,AbsoluteOrder.comp, true); |
| this.handleViewConf(this.createElectionMsg(local,others,local),local,view); |
| return; //the only member, no need for an election |
| } |
| if ( suggestedviewId != null ) { |
| |
| if ( view != null && Arrays.diff(view,suggestedView,local).length == 0 && Arrays.diff(suggestedView,view,local).length == 0) { |
| suggestedviewId = null; |
| suggestedView = null; |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, running election matches view")); |
| } else { |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running")); |
| } |
| return; //election already running, I'm not allowed to have two of them |
| } |
| if ( view != null && Arrays.diff(view,membership,local).length == 0 && Arrays.diff(membership,view,local).length == 0) { |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, view matches membership")); |
| return; //already have this view installed |
| } |
| int prio = AbsoluteOrder.comp.compare(local,others[0]); |
| MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader in my view? |
| if ( local.equals(leader) || force ) { |
| CoordinationMessage msg = createElectionMsg(local, others, leader); |
| suggestedviewId = msg.getId(); |
| suggestedView = new Membership(local,AbsoluteOrder.comp,true); |
| Arrays.fill(suggestedView,msg.getMembers()); |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PROCESS_ELECT,this,"Election, sending request")); |
| sendElectionMsg(local,others[0],msg); |
| } else { |
| try { |
| coordMsgReceived.set(false); |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting for request")); |
| electionMutex.wait(waitForCoordMsgTimeout); |
| }catch ( InterruptedException x ) { |
| Thread.currentThread().interrupted(); |
| } |
| if ( suggestedviewId == null && (!coordMsgReceived.get())) { |
| //no message arrived, send the coord msg |
| // fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out.")); |
| // startElection(true); |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, waiting timed out.")); |
| } else { |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, received a message")); |
| } |
| }//end if |
| |
| } |
| } |
| |
| private CoordinationMessage createElectionMsg(MemberImpl local, MemberImpl[] others, MemberImpl leader) { |
| Membership m = new Membership(local,AbsoluteOrder.comp,true); |
| Arrays.fill(m,others); |
| MemberImpl[] mbrs = m.getMembers(); |
| m.reset(); |
| CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), this.COORD_REQUEST); |
| return msg; |
| } |
| |
| protected void sendElectionMsg(MemberImpl local, MemberImpl next, CoordinationMessage msg) throws ChannelException { |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_SEND_MSG,this,"Sending election message to("+next.getName()+")")); |
| super.sendMessage(new Member[] {next}, createData(msg, local), null); |
| } |
| |
| protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException { |
| int next = Arrays.nextIndex(local,msg.getMembers()); |
| int current = next; |
| msg.leader = msg.getMembers()[0]; |
| boolean sent = false; |
| while ( !sent && current >= 0 ) { |
| try { |
| sendElectionMsg(local, (MemberImpl) msg.getMembers()[current], msg); |
| sent = true; |
| }catch ( ChannelException x ) { |
| log.warn("Unable to send election message to:"+msg.getMembers()[current]); |
| current = Arrays.nextIndex(msg.getMembers()[current],msg.getMembers()); |
| if ( current == next ) throw x; |
| } |
| } |
| } |
| |
| public Member getNextInLine(MemberImpl local, MemberImpl[] others) { |
| MemberImpl result = null; |
| for ( int i=0; i<others.length; i++ ) { |
| |
| } |
| return result; |
| } |
| |
| public ChannelData createData(CoordinationMessage msg, MemberImpl local) { |
| msg.write(); |
| ChannelData data = new ChannelData(true); |
| data.setAddress(local); |
| data.setMessage(msg.getBuffer()); |
| data.setOptions(Channel.SEND_OPTIONS_USE_ACK); |
| data.setTimestamp(System.currentTimeMillis()); |
| return data; |
| } |
| |
| protected void viewChange(UniqueId viewId, Member[] view) { |
| //invoke any listeners |
| } |
| |
| protected boolean alive(Member mbr) { |
| return TcpFailureDetector.memberAlive(mbr, |
| COORD_ALIVE, |
| false, |
| false, |
| waitForCoordMsgTimeout, |
| waitForCoordMsgTimeout, |
| getOptionFlag()); |
| } |
| |
| protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) { |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE,this,"Pre merge")); |
| MemberImpl local = (MemberImpl)getLocalMember(false); |
| Membership merged = new Membership(local,AbsoluteOrder.comp,true); |
| Arrays.fill(merged,msg.getMembers()); |
| Arrays.fill(merged,getMembers()); |
| Member[] diff = Arrays.diff(merged,membership,local); |
| for ( int i=0; i<diff.length; i++ ) { |
| if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]); |
| else memberAdded(diff[i],false); |
| } |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE,this,"Post merge")); |
| return merged; |
| } |
| |
| protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException { |
| if ( !coordMsgReceived.get() ) { |
| coordMsgReceived.set(true); |
| synchronized (electionMutex) { electionMutex.notifyAll();} |
| } |
| msg.timestamp = System.currentTimeMillis(); |
| Membership merged = mergeOnArrive(msg, sender); |
| if (isViewConf(msg)) handleViewConf(msg, sender, merged); |
| else handleToken(msg, sender, merged); |
| ClassLoader loader; |
| |
| } |
| |
| protected void handleToken(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { |
| MemberImpl local = (MemberImpl)getLocalMember(false); |
| if ( local.equals(msg.getSource()) ) { |
| //my message msg.src=local |
| handleMyToken(local, msg, sender,merged); |
| } else { |
| handleOtherToken(local, msg, sender,merged); |
| } |
| } |
| |
| protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { |
| if ( local.equals(msg.getLeader()) ) { |
| //no leadership change |
| if ( Arrays.sameMembers(msg.getMembers(),merged.getMembers()) ) { |
| msg.type = COORD_CONF; |
| super.sendMessage(Arrays.remove(msg.getMembers(),local),createData(msg,local),null); |
| handleViewConf(msg,local,merged); |
| } else { |
| //membership change |
| suggestedView = new Membership(local,AbsoluteOrder.comp,true); |
| suggestedviewId = msg.getId(); |
| Arrays.fill(suggestedView,merged.getMembers()); |
| msg.view = (MemberImpl[])merged.getMembers(); |
| sendElectionMsgToNextInline(local,msg); |
| } |
| } else { |
| //leadership change |
| suggestedView = null; |
| suggestedviewId = null; |
| msg.view = (MemberImpl[])merged.getMembers(); |
| sendElectionMsgToNextInline(local,msg); |
| } |
| } |
| |
| protected void handleOtherToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { |
| if ( local.equals(msg.getLeader()) ) { |
| //I am the new leader |
| //startElection(false); |
| } else { |
| msg.view = (MemberImpl[])merged.getMembers(); |
| sendElectionMsgToNextInline(local,msg); |
| } |
| } |
| |
| protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { |
| if ( viewId != null && msg.getId().equals(viewId) ) return;//we already have this view |
| view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp,true); |
| Arrays.fill(view,msg.getMembers()); |
| viewId = msg.getId(); |
| |
| if ( viewId.equals(suggestedviewId) ) { |
| suggestedView = null; |
| suggestedviewId = null; |
| } |
| |
| if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) { |
| suggestedView = null; |
| suggestedviewId = null; |
| } |
| |
| viewChange(viewId,view.getMembers()); |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View")); |
| |
| if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) { |
| startElection(false); |
| } |
| } |
| |
| protected boolean isViewConf(CoordinationMessage msg) { |
| return Arrays.contains(msg.getType(),0,COORD_CONF,0,COORD_CONF.length); |
| } |
| |
| protected boolean hasHigherPriority(Member[] complete, Member[] local) { |
| if ( local == null || local.length == 0 ) return false; |
| if ( complete == null || complete.length == 0 ) return true; |
| AbsoluteOrder.absoluteOrder(complete); |
| AbsoluteOrder.absoluteOrder(local); |
| return (AbsoluteOrder.comp.compare(complete[0],local[0]) > 0); |
| |
| } |
| |
| |
| /** |
| * Returns coordinator if one is available |
| * @return Member |
| */ |
| public Member getCoordinator() { |
| return (view != null && view.hasMembers()) ? view.getMembers()[0] : null; |
| } |
| |
| public Member[] getView() { |
| return (view != null && view.hasMembers()) ? view.getMembers() : new Member[0]; |
| } |
| |
| public UniqueId getViewId() { |
| return viewId; |
| } |
| |
| /** |
| * Block in/out messages while a election is going on |
| */ |
| protected void halt() { |
| |
| } |
| |
| /** |
| * Release lock for in/out messages election is completed |
| */ |
| protected void release() { |
| |
| } |
| |
| /** |
| * Wait for an election to end |
| */ |
| protected void waitForRelease() { |
| |
| } |
| |
| |
| //============================================================================================================ |
| // OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE |
| //============================================================================================================ |
| public void start(int svc) throws ChannelException { |
| if (membership == null) setupMembership(); |
| if (started)return; |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start")); |
| super.start(startsvc); |
| started = true; |
| if (view == null) view = new Membership( (MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true); |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "After start")); |
| startElection(false); |
| } |
| |
| public void stop(int svc) throws ChannelException { |
| try { |
| halt(); |
| synchronized (electionMutex) { |
| if (!started)return; |
| started = false; |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "Before stop")); |
| super.stop(startsvc); |
| this.view = null; |
| this.viewId = null; |
| this.suggestedView = null; |
| this.suggestedviewId = null; |
| this.membership.reset(); |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "After stop")); |
| } |
| }finally { |
| release(); |
| } |
| } |
| |
| |
| public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { |
| waitForRelease(); |
| super.sendMessage(destination, msg, payload); |
| } |
| |
| public void messageReceived(ChannelMessage msg) { |
| if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) { |
| //ignore message, its an alive message |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Alive Message")); |
| |
| } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) { |
| try { |
| CoordinationMessage cmsg = new CoordinationMessage(msg.getMessage()); |
| Member[] cmbr = cmsg.getMembers(); |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Coord Msg Arrived("+Arrays.toNameString(cmbr)+")")); |
| processCoordMessage(cmsg, msg.getAddress()); |
| }catch ( ChannelException x ) { |
| log.error("Error processing coordination message. Could be fatal.",x); |
| } |
| } else { |
| super.messageReceived(msg); |
| } |
| } |
| |
| public boolean accept(ChannelMessage msg) { |
| return super.accept(msg); |
| } |
| |
| public void memberAdded(Member member) { |
| memberAdded(member,true); |
| } |
| |
| public void memberAdded(Member member,boolean elect) { |
| try { |
| if ( membership == null ) setupMembership(); |
| if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member); |
| try { |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_ADD,this,"Member add("+member.getName()+")")); |
| if (started && elect) startElection(false); |
| }catch ( ChannelException x ) { |
| log.error("Unable to start election when member was added.",x); |
| } |
| }finally { |
| } |
| |
| } |
| |
| public void memberDisappeared(Member member) { |
| try { |
| |
| membership.removeMember((MemberImpl)member); |
| super.memberDisappeared(member); |
| try { |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")")); |
| if ( started && (isCoordinator() || isHighest()) ) |
| startElection(true); //to do, if a member disappears, only the coordinator can start |
| }catch ( ChannelException x ) { |
| log.error("Unable to start election when member was removed.",x); |
| } |
| }finally { |
| } |
| } |
| |
| public boolean isHighest() { |
| Member local = getLocalMember(false); |
| if ( membership.getMembers().length == 0 ) return true; |
| else return AbsoluteOrder.comp.compare(local,membership.getMembers()[0])<=0; |
| } |
| |
| public boolean isCoordinator() { |
| Member coord = getCoordinator(); |
| return coord != null && getLocalMember(false).equals(coord); |
| } |
| |
| public void heartbeat() { |
| try { |
| MemberImpl local = (MemberImpl)getLocalMember(false); |
| if ( view != null && (Arrays.diff(view,membership,local).length != 0 || Arrays.diff(membership,view,local).length != 0) ) { |
| if ( isHighest() ) { |
| fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this, |
| "Heartbeat found inconsistency, restart election")); |
| startElection(true); |
| } |
| } |
| } catch ( Exception x ){ |
| log.error("Unable to perform heartbeat.",x); |
| } finally { |
| super.heartbeat(); |
| } |
| } |
| |
| /** |
| * has members |
| */ |
| public boolean hasMembers() { |
| |
| return membership.hasMembers(); |
| } |
| |
| /** |
| * Get all current cluster members |
| * @return all members or empty array |
| */ |
| public Member[] getMembers() { |
| |
| return membership.getMembers(); |
| } |
| |
| /** |
| * |
| * @param mbr Member |
| * @return Member |
| */ |
| public Member getMember(Member mbr) { |
| |
| return membership.getMember(mbr); |
| } |
| |
| /** |
| * Return the member that represents this node. |
| * |
| * @return Member |
| */ |
| public Member getLocalMember(boolean incAlive) { |
| Member local = super.getLocalMember(incAlive); |
| if ( view == null && (local != null)) setupMembership(); |
| return local; |
| } |
| |
| protected synchronized void setupMembership() { |
| if ( membership == null ) { |
| membership = new Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,false); |
| } |
| } |
| |
| |
| //============================================================================================================ |
| // HELPER CLASSES FOR COORDINATION |
| //============================================================================================================ |
| |
| |
| |
| |
| public static class CoordinationMessage { |
| //X{A-ldr, A-src, mbrs-A,B,C,D} |
| protected XByteBuffer buf; |
| protected MemberImpl leader; |
| protected MemberImpl source; |
| protected MemberImpl[] view; |
| protected UniqueId id; |
| protected byte[] type; |
| protected long timestamp = System.currentTimeMillis(); |
| |
| public CoordinationMessage(XByteBuffer buf) { |
| this.buf = buf; |
| parse(); |
| } |
| |
| public CoordinationMessage(MemberImpl leader, |
| MemberImpl source, |
| MemberImpl[] view, |
| UniqueId id, |
| byte[] type) { |
| this.buf = new XByteBuffer(4096,false); |
| this.leader = leader; |
| this.source = source; |
| this.view = view; |
| this.id = id; |
| this.type = type; |
| this.write(); |
| } |
| |
| |
| public byte[] getHeader() { |
| return NonBlockingCoordinator.COORD_HEADER; |
| } |
| |
| public MemberImpl getLeader() { |
| if ( leader == null ) parse(); |
| return leader; |
| } |
| |
| public MemberImpl getSource() { |
| if ( source == null ) parse(); |
| return source; |
| } |
| |
| public UniqueId getId() { |
| if ( id == null ) parse(); |
| return id; |
| } |
| |
| public MemberImpl[] getMembers() { |
| if ( view == null ) parse(); |
| return view; |
| } |
| |
| public byte[] getType() { |
| if (type == null ) parse(); |
| return type; |
| } |
| |
| public XByteBuffer getBuffer() { |
| return this.buf; |
| } |
| |
| public void parse() { |
| //header |
| int offset = 16; |
| //leader |
| int ldrLen = buf.toInt(buf.getBytesDirect(),offset); |
| offset += 4; |
| byte[] ldr = new byte[ldrLen]; |
| System.arraycopy(buf.getBytesDirect(),offset,ldr,0,ldrLen); |
| leader = MemberImpl.getMember(ldr); |
| offset += ldrLen; |
| //source |
| int srcLen = buf.toInt(buf.getBytesDirect(),offset); |
| offset += 4; |
| byte[] src = new byte[srcLen]; |
| System.arraycopy(buf.getBytesDirect(),offset,src,0,srcLen); |
| source = MemberImpl.getMember(src); |
| offset += srcLen; |
| //view |
| int mbrCount = buf.toInt(buf.getBytesDirect(),offset); |
| offset += 4; |
| view = new MemberImpl[mbrCount]; |
| for (int i=0; i<view.length; i++ ) { |
| int mbrLen = buf.toInt(buf.getBytesDirect(),offset); |
| offset += 4; |
| byte[] mbr = new byte[mbrLen]; |
| System.arraycopy(buf.getBytesDirect(), offset, mbr, 0, mbrLen); |
| view[i] = MemberImpl.getMember(mbr); |
| offset += mbrLen; |
| } |
| //id |
| this.id = new UniqueId(buf.getBytesDirect(),offset,16); |
| offset += 16; |
| type = new byte[16]; |
| System.arraycopy(buf.getBytesDirect(), offset, type, 0, type.length); |
| offset += 16; |
| |
| } |
| |
| public void write() { |
| buf.reset(); |
| //header |
| buf.append(COORD_HEADER,0,COORD_HEADER.length); |
| //leader |
| byte[] ldr = leader.getData(false,false); |
| buf.append(ldr.length); |
| buf.append(ldr,0,ldr.length); |
| ldr = null; |
| //source |
| byte[] src = source.getData(false,false); |
| buf.append(src.length); |
| buf.append(src,0,src.length); |
| src = null; |
| //view |
| buf.append(view.length); |
| for (int i=0; i<view.length; i++ ) { |
| byte[] mbr = view[i].getData(false,false); |
| buf.append(mbr.length); |
| buf.append(mbr,0,mbr.length); |
| } |
| //id |
| buf.append(id.getBytes(),0,id.getBytes().length); |
| buf.append(type,0,type.length); |
| } |
| } |
| |
| public void fireInterceptorEvent(InterceptorEvent event) { |
| if (event instanceof CoordinationEvent && |
| ((CoordinationEvent)event).type == CoordinationEvent.EVT_CONF_RX) |
| log.info(event); |
| } |
| |
| public static class CoordinationEvent implements InterceptorEvent { |
| public static final int EVT_START = 1; |
| public static final int EVT_MBR_ADD = 2; |
| public static final int EVT_MBR_DEL = 3; |
| public static final int EVT_START_ELECT = 4; |
| public static final int EVT_PROCESS_ELECT = 5; |
| public static final int EVT_MSG_ARRIVE = 6; |
| public static final int EVT_PRE_MERGE = 7; |
| public static final int EVT_POST_MERGE = 8; |
| public static final int EVT_WAIT_FOR_MSG = 9; |
| public static final int EVT_SEND_MSG = 10; |
| public static final int EVT_STOP = 11; |
| public static final int EVT_CONF_RX = 12; |
| public static final int EVT_ELECT_ABANDONED = 13; |
| |
| int type; |
| ChannelInterceptor interceptor; |
| Member coord; |
| Member[] mbrs; |
| String info; |
| Membership view; |
| Membership suggestedView; |
| public CoordinationEvent(int type,ChannelInterceptor interceptor, String info) { |
| this.type = type; |
| this.interceptor = interceptor; |
| this.coord = ((NonBlockingCoordinator)interceptor).getCoordinator(); |
| this.mbrs = ((NonBlockingCoordinator)interceptor).membership.getMembers(); |
| this.info = info; |
| this.view = ((NonBlockingCoordinator)interceptor).view; |
| this.suggestedView = ((NonBlockingCoordinator)interceptor).suggestedView; |
| } |
| |
| public int getEventType() { |
| return type; |
| } |
| |
| public String getEventTypeDesc() { |
| switch (type) { |
| case EVT_START: return "EVT_START:"+info; |
| case EVT_MBR_ADD: return "EVT_MBR_ADD:"+info; |
| case EVT_MBR_DEL: return "EVT_MBR_DEL:"+info; |
| case EVT_START_ELECT: return "EVT_START_ELECT:"+info; |
| case EVT_PROCESS_ELECT: return "EVT_PROCESS_ELECT:"+info; |
| case EVT_MSG_ARRIVE: return "EVT_MSG_ARRIVE:"+info; |
| case EVT_PRE_MERGE: return "EVT_PRE_MERGE:"+info; |
| case EVT_POST_MERGE: return "EVT_POST_MERGE:"+info; |
| case EVT_WAIT_FOR_MSG: return "EVT_WAIT_FOR_MSG:"+info; |
| case EVT_SEND_MSG: return "EVT_SEND_MSG:"+info; |
| case EVT_STOP: return "EVT_STOP:"+info; |
| case EVT_CONF_RX: return "EVT_CONF_RX:"+info; |
| case EVT_ELECT_ABANDONED: return "EVT_ELECT_ABANDONED:"+info; |
| default: return "Unknown"; |
| } |
| } |
| |
| public ChannelInterceptor getInterceptor() { |
| return interceptor; |
| } |
| |
| public String toString() { |
| StringBuffer buf = new StringBuffer("CoordinationEvent[type="); |
| buf.append(type).append("\n\tLocal:"); |
| Member local = interceptor.getLocalMember(false); |
| buf.append(local!=null?local.getName():"").append("\n\tCoord:"); |
| buf.append(coord!=null?coord.getName():"").append("\n\tView:"); |
| buf.append(Arrays.toNameString(view!=null?view.getMembers():null)).append("\n\tSuggested View:"); |
| buf.append(Arrays.toNameString(suggestedView!=null?suggestedView.getMembers():null)).append("\n\tMembers:"); |
| buf.append(Arrays.toNameString(mbrs)).append("\n\tInfo:"); |
| buf.append(info).append("]"); |
| return buf.toString(); |
| } |
| } |
| |
| |
| |
| |
| |
| } |