| /** Notice of modification as required by the LGPL |
| * This file was modified by Gemstone Systems Inc. on |
| * $Date$ |
| **/ |
| // $Id: GMS.java,v 1.49 2005/12/23 14:57:06 belaban Exp $ |
| |
| package com.gemstone.org.jgroups.protocols.pbcast; |
| |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.io.ObjectInput; |
| import java.io.ObjectOutput; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.Enumeration; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Hashtable; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.Vector; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import com.gemstone.org.jgroups.Address; |
| import com.gemstone.org.jgroups.Event; |
| import com.gemstone.org.jgroups.Global; |
| import com.gemstone.org.jgroups.Header; |
| import com.gemstone.org.jgroups.JGroupsVersion; |
| import com.gemstone.org.jgroups.Membership; |
| import com.gemstone.org.jgroups.MergeView; |
| import com.gemstone.org.jgroups.Message; |
| import com.gemstone.org.jgroups.ShunnedAddressException; |
| import com.gemstone.org.jgroups.SuspectMember; |
| import com.gemstone.org.jgroups.TimeoutException; |
| import com.gemstone.org.jgroups.View; |
| import com.gemstone.org.jgroups.ViewId; |
| import com.gemstone.org.jgroups.protocols.FD_SOCK; |
| import com.gemstone.org.jgroups.protocols.UNICAST; |
| import com.gemstone.org.jgroups.stack.GossipServer; |
| import com.gemstone.org.jgroups.stack.IpAddress; |
| import com.gemstone.org.jgroups.stack.Protocol; |
| import com.gemstone.org.jgroups.util.AckCollector; |
| import com.gemstone.org.jgroups.util.BoundedList; |
| import com.gemstone.org.jgroups.util.ExternalStrings; |
| import com.gemstone.org.jgroups.util.GFStringIdImpl; |
| import com.gemstone.org.jgroups.util.GemFireTracer; |
| import com.gemstone.org.jgroups.util.Promise; |
| import com.gemstone.org.jgroups.util.QueueClosedException; |
| import com.gemstone.org.jgroups.util.Streamable; |
| import com.gemstone.org.jgroups.util.TimeScheduler; |
| import com.gemstone.org.jgroups.util.Util; |
| |
| /** |
| * Group membership protocol. Handles joins/leaves/crashes (suspicions) and emits new views |
| * accordingly. Use VIEW_ENFORCER on top of this layer to make sure new members don't receive |
| * any messages until they are members. |
| */ |
| public class GMS extends Protocol { |
| protected/*GemStoneAddition*/ GmsImpl impl=null; |
| Address local_addr=null; |
| final Membership members=new Membership(); // real membership |
| private final Membership tmp_members=new Membership(); // base for computing next view |
| static final String FAILED_TO_ACK_MEMBER_VERIFY_THREAD = "Failed to ACK member verify Thread" ; |
| |
| /** Members joined but for which no view has been received yet */ |
| private final Vector joining=new Vector(7); |
| |
| /** Members excluded from group, but for which no view has been received yet */ |
| private final Vector leaving=new Vector(7); |
| |
| View view=null; |
| ViewId view_id=null; |
| private long ltime=0; |
| long join_timeout=5000; |
| long join_retry_timeout=2000; |
| long leave_timeout=5000; |
| private long digest_timeout=0; // time to wait for a digest (from PBCAST). should be fast |
| long merge_timeout=10000; // time to wait for all MERGE_RSPS |
| private final Object impl_mutex=new Object(); // synchronizes event entry into impl |
| private final Object digest_mutex=new Object(); |
| private final Promise digest_promise=new Promise(); // holds result of GET_DIGEST event |
| private final Hashtable impls=new Hashtable(3); |
| private boolean shun=true; |
| boolean merge_leader=false; // can I initiate a merge ? |
| private boolean print_local_addr=true; |
| boolean disable_initial_coord=false; // can the member become a coord on startup or not ? |
| /** Setting this to false disables concurrent startups. This is only used by unit testing code |
| * for testing merging. To everybody else: don't change it to false ! */ |
| boolean handle_concurrent_startup=true; |
| static final String CLIENT="Client"; |
| static final String COORD="Coordinator"; |
| static final String PART="Participant"; |
| TimeScheduler timer=null; |
| private volatile boolean joined; // GemStoneAddition - has this member joined? |
| private volatile boolean disconnected; // GemStoneAddition - has this member disconnected? |
| /** |
| * GemStoneAddition - network partition detection uses a "leader" |
| * process to decide which partition will survive. |
| */ |
| volatile Address leader; |
| |
| /** |
| * GemStoneAddition - are coordinators forced to be colocated with gossip servers? |
| */ |
| boolean floatingCoordinatorDisabled; |
| |
| /** |
| * GemStoneAddition - is this process split-brain-detection-enabled? If so, |
| * it's eligible to be the leader process. |
| */ |
| boolean splitBrainDetectionEnabled; |
| |
| /** |
| * GemStoneAddition - indicates whether a network partition has already been detected. |
| * This is used to prevent this member from installing a new view after a network partition. |
| * See bug #39279 |
| */ |
| protected boolean networkPartitionDetected = false; |
| |
| /** Max number of old members to keep in history */ |
| protected int num_prev_mbrs=50; |
| |
| /** Keeps track of old members (up to num_prev_mbrs) */ |
| BoundedList prev_members=null; |
| |
| int num_views=0; |
| |
| /** Stores the last 20 views */ |
| BoundedList prev_views=new BoundedList(20); |
| |
| |
| /** Class to process JOIN, LEAVE and MERGE requests */ |
| public final ViewHandler view_handler=new ViewHandler(); |
| |
| /** To collect VIEW_ACKs from all members */ |
| final AckCollector ack_collector=new AckCollector(); |
| |
| /** To collect PREPARE_FOR_VIEW acks from members */ |
| final AckCollector prepare_collector = new AckCollector(); // GemStoneAddition |
| |
| /** Time in ms to wait for all VIEW acks (0 == wait forever) */ |
| long view_ack_collection_timeout=12437; // GemStoneAddition - was 20000; |
| // for 6.5 release, changed from 17437 to 12437 - well below the default join timeout period |
| |
| /** How long should a Resumer wait until resuming the ViewHandler */ |
| long resume_task_timeout=17439; // GemStoneAddition - was 20000; |
| |
| private int partitionThreshold = 51; // GemStoneAddition |
| |
| private int memberWeight; // GemStoneAddition - see network partition detection spec |
| private volatile View preparedView; // GemStoneAddition - see network partition detection spec |
| private View previousPreparedView; // GemStoneAddition |
| private volatile Address coordinator; |
| |
| public static final String name="GMS"; |
| |
| /** GemStoneAddition - if set this causes view casting to be delayed by some number of seconds */ |
| public static int TEST_HOOK_SLOW_VIEW_CASTING; |
| |
| /** |
| * GemStoneAddition - amount of time to wait for additional join/leave |
| * requests before processing. Set gemfire.VIEW_BUNDLING_WAIT_TIME to |
| * the number of milliseconds. Defaults to 150ms. |
| */ |
| static final long BUNDLE_WAITTIME = Integer.getInteger("gemfire.VIEW_BUNDLING_WAIT_TIME", 150).intValue(); |
| |
| private Object installViewLock = new Object(); //lock to assure that install views will atomically update the view, transition coordinators and notify locators |
| |
| public GMS() { |
| initState(); |
| } |
| |
| |
| @Override // GemStoneAddition |
| public String getName() { |
| return name; |
| } |
| |
| // start GemStoneAddition |
| @Override // GemStoneAddition |
| public int getProtocolEnum() { |
| return com.gemstone.org.jgroups.stack.Protocol.enumGMS; |
| } |
| // end GemStone addition |
| |
| |
| |
| public String getView() {return view_id != null? view_id.toString() : "null";} |
| public int getNumberOfViews() {return num_views;} |
| public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";} |
| public String getMembers() {return members != null? members.toString() : "[]";} |
| public int getNumMembers() {return members != null? members.size() : 0;} |
| public long getJoinTimeout() {return join_timeout;} |
| public void setJoinTimeout(long t) {join_timeout=t;} |
| public long getJoinRetryTimeout() {return join_retry_timeout;} |
| public void setJoinRetryTimeout(long t) {join_retry_timeout=t;} |
| public boolean isShun() {return shun;} |
| public void setShun(boolean s) {shun=s;} |
| public String printPreviousMembers() { |
| StringBuffer sb=new StringBuffer(); |
| if(prev_members != null) { |
| for(Enumeration en=prev_members.elements(); en.hasMoreElements();) { |
| sb.append(en.nextElement()).append("\n"); |
| } |
| } |
| return sb.toString(); |
| } |
| |
| public int viewHandlerSize() {return view_handler.size();} |
| public boolean isViewHandlerSuspended() {return view_handler.suspended();} |
| public String dumpViewHandlerQueue() { |
| return view_handler.dumpQueue(); |
| } |
| public String dumpViewHandlerHistory() { |
| return view_handler.dumpHistory(); |
| } |
| public void suspendViewHandler() { |
| view_handler.suspend(null); |
| } |
| public void resumeViewHandler() { |
| view_handler.resumeForce(); |
| } |
| |
| GemFireTracer getLog() {return log;} |
| |
| public String printPreviousViews() { |
| StringBuffer sb=new StringBuffer(); |
| for(Enumeration en=prev_views.elements(); en.hasMoreElements();) { |
| sb.append(en.nextElement()).append("\n"); |
| } |
| return sb.toString(); |
| } |
| |
| public boolean isCoordinator() { |
| Address coord=determineCoordinator(); |
| return coord != null && local_addr != null && local_addr.equals(coord); |
| } |
| |
| /** |
| * For testing we sometimes need to be able to disable disconnecting during |
| * failure scenarios |
| */ |
| public void disableDisconnectOnQuorumLossForTesting() { |
| this.splitBrainDetectionEnabled = false; |
| } |
| |
| |
| @Override // GemStoneAddition |
| public void resetStats() { |
| super.resetStats(); |
| num_views=0; |
| prev_views.removeAll(); |
| } |
| |
| |
| @Override // GemStoneAddition |
| public Vector requiredDownServices() { |
| Vector retval=new Vector(3); |
| retval.addElement(Integer.valueOf(Event.GET_DIGEST)); |
| retval.addElement(Integer.valueOf(Event.SET_DIGEST)); |
| retval.addElement(Integer.valueOf(Event.FIND_INITIAL_MBRS)); |
| return retval; |
| } |
| |
| |
| public void setImpl(GmsImpl new_impl) { |
| synchronized(impl_mutex) { |
| if(impl == new_impl) // superfluous |
| return; |
| impl=new_impl; |
| // if (impl instanceof CoordGmsImpl) { |
| // log.getLogWriter().info( |
| // JGroupsStrings.GMS_THIS_MEMBER_IS_BECOMING_GROUP_COORDINATOR); |
| // } |
| if(log.isDebugEnabled()) { |
| String msg=(local_addr != null? local_addr.toString()+" " : "") + "changed role to " + new_impl.getClass().getName(); |
| log.debug(msg); |
| } |
| } |
| } |
| |
| |
| public GmsImpl getImpl() { |
| return impl; |
| } |
| |
| |
| @Override // GemStoneAddition |
| public void init() throws Exception { |
| prev_members=new BoundedList(num_prev_mbrs); |
| timer=stack != null? stack.timer : null; |
| if(timer == null) |
| throw new Exception("GMS.init(): timer is null"); |
| if(impl != null) |
| impl.init(); |
| } |
| |
| @Override // GemStoneAddition |
| public void start() throws Exception { |
| this.disconnected = false; |
| if(impl != null) impl.start(); |
| } |
| |
| @Override // GemStoneAddition |
| public void stop() { |
| view_handler.stop(true); |
| if(impl != null) impl.stop(); |
| if(prev_members != null) |
| prev_members.removeAll(); |
| } |
| |
| |
| // GemStoneAddition - added suspects argument for bug 41772 |
| public synchronized void becomeCoordinator(Vector suspects) { |
| if (!(impl instanceof CoordGmsImpl)) { // GemStoneAddition, synchronize and checked for redundant becomeCoordinator |
| log.getLogWriter().info(ExternalStrings.GMS_THIS_MEMBER_0_IS_BECOMING_GROUP_COORDINATOR, this.local_addr/*, new Exception("stack trace")*/); |
| CoordGmsImpl tmp=(CoordGmsImpl)impls.get(COORD); |
| if(tmp == null) { |
| tmp=new CoordGmsImpl(this); |
| impls.put(COORD, tmp); |
| } |
| try { |
| tmp.init(); |
| } |
| catch(Exception e) { |
| log.error(ExternalStrings.GMS_EXCEPTION_SWITCHING_TO_COORDINATOR_ROLE, e); |
| } |
| if (((IpAddress)this.local_addr).getBirthViewId() < 0) { |
| ((IpAddress)this.local_addr).setBirthViewId( |
| this.view_id == null? 0 : this.view_id.getId()); |
| } |
| setImpl(tmp); |
| if (suspects != null && suspects.size() > 0) { |
| List suspectList = new LinkedList(suspects); |
| impl.handleLeave(suspectList, true, |
| Collections.singletonList("Member was suspected of being dead prior to " + this.local_addr |
| + " becoming group coordinator"), false); |
| } |
| } |
| } |
| |
| // GemStoneAddition - logical time can become confused when a |
| // coordinator sends out a view using unicast and the view isn't |
| // sent all members before the coordinator dies. We increment the |
| // Lamport clock to avoid missing updates |
| protected synchronized void incrementLtime(int amount) { |
| ltime += amount; |
| } |
| |
| |
| public void becomeParticipant() { |
| if (this.stack.getChannel().closing()) { // GemStoneAddition - fix for bug #42969 |
| return; // don't try to become a participant if we're shutting down |
| } |
| ParticipantGmsImpl tmp=(ParticipantGmsImpl)impls.get(PART); |
| |
| if(tmp == null) { |
| tmp=new ParticipantGmsImpl(this); |
| impls.put(PART, tmp); |
| } |
| try { |
| tmp.init(); |
| } |
| catch(Exception e) { |
| log.error(ExternalStrings.GMS_EXCEPTION_SWITCHING_TO_PARTICIPANT, e); |
| } |
| setImpl(tmp); |
| } |
| |
| public void becomeClient() { |
| ClientGmsImpl tmp=(ClientGmsImpl)impls.get(CLIENT); |
| if(tmp == null) { |
| tmp=new ClientGmsImpl(this); |
| impls.put(CLIENT, tmp); |
| } |
| try { |
| tmp.init(); |
| } |
| catch(Exception e) { |
| log.error(ExternalStrings.GMS_EXCEPTION_SWITCHING_TO_CLIENT_ROLE, e); |
| } |
| setImpl(tmp); |
| } |
| |
| |
| boolean haveCoordinatorRole() { |
| return impl != null && impl instanceof CoordGmsImpl; |
| } |
| |
| boolean haveParticipantRole() { |
| return impl != null && impl instanceof ParticipantGmsImpl; |
| } |
| |
| |
| /** |
| * Computes the next view. Returns a copy that has <code>old_mbrs</code> and |
| * <code>suspected_mbrs</code> removed and <code>new_mbrs</code> added. |
| */ |
| public View getNextView(Vector added_mbrs, Vector left_mbrs, Vector suspected_mbrs) { |
| Vector mbrs; |
| long vid; |
| View v; |
| Membership tmp_mbrs; |
| Address tmp_mbr; |
| |
| synchronized(members) { |
| if(view_id == null) { |
| // return null; // this should *never* happen ! GemStoneAddition |
| // log.error("view_id is null", new Exception()); // GemStoneAddition debug |
| vid = 0; // GemStoneAddition |
| } |
| else { |
| vid=Math.max(view_id.getId(), ltime) + 1; |
| } |
| ltime=vid; |
| tmp_mbrs=tmp_members.copy(); // always operate on the temporary membership |
| |
| if (suspected_mbrs != null) { // GemStoneAddition - if a mbr is shutting down, just remove it |
| if (left_mbrs == null) { |
| left_mbrs = new Vector(); |
| } |
| for (Iterator<IpAddress> it = suspected_mbrs.iterator(); it.hasNext(); ) { |
| IpAddress addr = it.next(); |
| if (this.stack.gfPeerFunctions.isShuttingDown(addr)) { |
| it.remove(); |
| left_mbrs.add(addr); |
| } |
| } |
| } |
| |
| tmp_mbrs.remove(suspected_mbrs); |
| tmp_mbrs.remove(left_mbrs); |
| tmp_mbrs.add(added_mbrs); |
| mbrs=tmp_mbrs.getMembers(); |
| |
| // putting the coordinator at the front caused problems with dlock |
| // because the coordinator might think it is an elder, but the old |
| // elder would not release its role, causing a deadlock. See #47562 |
| // if (mbrs.contains(local_addr) && !mbrs.get(0).equals(local_addr)) { |
| // mbrs.remove(local_addr); |
| // mbrs.add(0, this.local_addr); |
| // } |
| |
| v=new View(local_addr, vid, mbrs, suspected_mbrs); |
| |
| // Update membership (see DESIGN for explanation): |
| tmp_members.set(mbrs); |
| |
| // Update joining list (see DESIGN for explanation) |
| if(added_mbrs != null) { |
| for(int i=0; i < added_mbrs.size(); i++) { |
| tmp_mbr=(Address)added_mbrs.elementAt(i); |
| if(!joining.contains(tmp_mbr)) |
| joining.addElement(tmp_mbr); |
| } |
| } |
| |
| // Update leaving list (see DESIGN for explanations) |
| if(left_mbrs != null) { |
| for(Iterator it=left_mbrs.iterator(); it.hasNext();) { |
| Address addr=(Address)it.next(); |
| if(!this.leaving.contains(addr)) |
| this.leaving.add(addr); |
| } |
| } |
| if(suspected_mbrs != null) { |
| for(Iterator it=suspected_mbrs.iterator(); it.hasNext();) { |
| Address addr=(Address)it.next(); |
| if(!this.leaving.contains(addr)) |
| this.leaving.add(addr); |
| } |
| } |
| |
| if(log.isDebugEnabled()) log.debug("new view is " + v); |
| return v; |
| } |
| } |
| |
| |
| /** |
| Compute a new view, given the current view, the new members and the suspected/left |
| members. Then simply mcast the view to all members. This is different to the VS GMS protocol, |
| in which we run a FLUSH protocol which tries to achive consensus on the set of messages mcast in |
| the current view before proceeding to install the next view. |
| |
| The members for the new view are computed as follows: |
| <pre> |
| existing leaving suspected joining |
| |
| 1. new_view y n n y |
| 2. tmp_view y y n y |
| (view_dest) |
| </pre> |
| |
| <ol> |
| <li> |
| The new view to be installed includes the existing members plus the joining ones and |
| excludes the leaving and suspected members. |
| <li> |
| A temporary view is sent down the stack as an <em>event</em>. This allows the bottom layer |
| (e.g. UDP or TCP) to determine the members to which to send a multicast message. Compared |
| to the new view, leaving members are <em>included</em> since they have are waiting for a |
| view in which they are not members any longer before they leave. So, if we did not set a |
| temporary view, joining members would not receive the view (signalling that they have been |
| joined successfully). The temporary view is essentially the current view plus the joining |
| members (old members are still part of the current view). |
| </ol> |
| * @param mcast TODO |
| */ |
| public void castViewChange(Vector new_mbrs, Vector left_mbrs, Vector suspected_mbrs, boolean mcast) { |
| View new_view; |
| |
| // next view: current mbrs + new_mbrs - old_mbrs - suspected_mbrs |
| new_view=getNextView(new_mbrs, left_mbrs, suspected_mbrs); |
| castViewChange(new_view, null, true); |
| } |
| |
| |
| public void castViewChange(View new_view, Digest digest, boolean mcast) { |
| //boolean mcast = stack.jgmm.getDistributionConfig().getMcastPort() > 0; // GemStoneAddition |
| castViewChangeWithDest(new_view, digest, null, true); |
| } |
| |
| // GemStoneAddition - send the view with unicast to avoid NAKACK rejection of non-member |
| protected void ucastViewChange(View new_view) { |
| castViewChangeWithDest(new_view, null, null, false); |
| } |
| |
| |
| /** |
| * Broadcasts the new view and digest, and waits for acks from all members in the list given as argument. |
| * If the list is null, we take the members who are part of new_view |
| * <p>GemStoneAddition much of this method was rewritten for quorum-based |
| * network partition detection |
| * @param new_view |
| * @param digest |
| * @param newMbrs_p |
| * @param mcast_p whether multicast may be used (GemStoneAddition) |
| */ |
| public void castViewChangeWithDest(View new_view, Digest digest, java.util.List newMbrs_p, boolean mcast_p) { |
| GmsHeader hdr; |
| long start, stop; |
| ViewId vid=new_view.getVid(); |
| int size=-1; |
| |
| if (this.disconnected) { |
| // GemStoneAddition: during debugging #50633 ViewHandler was found to |
| // be casting a new view after the stack had been disconnected. UNICAST |
| // had already reset all of its connections so everyone ignored the |
| // view, but that's not guaranteed to be the case. |
| return; |
| } |
| |
| if (TEST_HOOK_SLOW_VIEW_CASTING > 0) { |
| try { |
| log.getLogWriter().info(ExternalStrings.DEBUG, |
| "Delaying view casting by " + TEST_HOOK_SLOW_VIEW_CASTING |
| + " seconds for testing"); |
| Thread.sleep(TEST_HOOK_SLOW_VIEW_CASTING * 1000); |
| } catch (InterruptedException e) { |
| log.getLogWriter().fine("Test hook interrupted while sleeping"); |
| Thread.currentThread().interrupt(); |
| } finally { |
| TEST_HOOK_SLOW_VIEW_CASTING = 0; |
| } |
| } |
| |
| // GemStoneAddition |
| // we've already sent a tmp_view down the stack to inform comm procotols |
| // about new members. This tmp_view notification will remove old members |
| // as well so that address canonicalization will not pick up old addresses |
| // if a new member reuses an addr:port of an old member |
| passDown(new Event(Event.TMP_VIEW, new_view)); |
| |
| // do not multicast the view unless we really have a multicast port. Otherwise |
| // we are relying on weak messaging support in TP based on the last installed view |
| // see bug #39429 |
| boolean mcast = mcast_p && stack.gfPeerFunctions.getMcastPort() > 0; // GemStoneAddition |
| |
| List newMbrs = newMbrs_p; // GemStoneAddition - don't update formal parameters |
| if(newMbrs == null || newMbrs.size() == 0) { |
| newMbrs=new LinkedList(new_view.getMembers()); |
| } |
| |
| Set suspects = new HashSet(new_view.getSuspectedMembers()); |
| for (Iterator it=suspects.iterator(); it.hasNext(); ) { |
| IpAddress addr = (IpAddress)it.next(); |
| if (this.stack.gfPeerFunctions.isShuttingDown(addr)) { // GemStoneAddition bug #44342 |
| new_view.notSuspect(addr); |
| } |
| } |
| |
| log.getLogWriter().info(ExternalStrings.GMS_MEMBERSHIP_SENDING_NEW_VIEW_0_1_MBRS, |
| new Object[] { new_view, Integer.valueOf(new_view.size())}/*, new Exception("STACK")*/); |
| |
| start=System.currentTimeMillis(); |
| hdr=new GmsHeader(GmsHeader.VIEW); |
| new_view.setMessageDigest(digest); // GemStoneAddition - move digest to message body |
| // hdr.my_digest=digest; |
| try { |
| // in 7.0.1 we switch to always performing 2-phased view installation and quorum checks |
| if (!prepareView(new_view, mcast, newMbrs)) { |
| return; |
| } |
| |
| synchronized (ack_collector) { // GemStoneAddition bug34505 |
| ack_collector.reset(vid, newMbrs); |
| size=ack_collector.size(); |
| } |
| |
| // GemStoneAddition - when PWNing an existing view, we have to unicast |
| // because NAKACK will reject a mcast message from a non-member |
| if (mcast) { |
| Message view_change_msg=new Message(); // bcast to all members |
| view_change_msg.isHighPriority = true; |
| view_change_msg.putHeader(name, hdr); |
| view_change_msg.setObject(new_view); |
| passDown(new Event(Event.MSG, view_change_msg)); |
| } |
| else { |
| for (int i=0; i<newMbrs.size(); i++) { |
| Message msg = new Message(); |
| msg.isHighPriority = true; |
| msg.putHeader(name, hdr); |
| msg.setObject(new_view); |
| msg.setDest((Address)newMbrs.get(i)); |
| passDown(new Event(Event.MSG, msg)); |
| } |
| } |
| // also send the view to any suspect members to give them a |
| // chance to exit. bypass use of UNICAST since it will refuse |
| // to send a message to a non-member |
| if (new_view.getSuspectedMembers() != null) { |
| for (Iterator it=new_view.getSuspectedMembers().iterator(); it.hasNext(); ) { |
| Message msg = new Message(); |
| msg.isHighPriority = true; |
| msg.putHeader(name, hdr); |
| msg.putHeader(UNICAST.BYPASS_UNICAST, hdr); |
| msg.setDest((Address)it.next()); |
| passDown(new Event(Event.MSG, msg)); |
| } |
| } |
| // if (new_view.getSuspectedMembers().size() > 0 && log.getLogWriterI18n().fineEnabled()) { |
| // log.getLogWriterI18n().warning( |
| // JGroupsStrings.DEBUG, "bogus warning to test failure mode" |
| // ); |
| // } |
| try { |
| ack_collector.waitForAllAcks(view_ack_collection_timeout); |
| if(trace) { |
| stop=System.currentTimeMillis(); |
| log.trace("received all ACKs (" + size + ") for " + vid + " in " + (stop-start) + "ms"); |
| } |
| } |
| catch(TimeoutException e) { |
| // GemStoneAddition - bug35218 |
| String missingStr; |
| String receivedStr; |
| synchronized (ack_collector) { |
| missingStr = ack_collector.getMissingAcks().toString(); |
| receivedStr = ack_collector.getReceived().toString(); |
| } |
| // In 8.0 this is changed to not do suspect processing if the coordinator is shutting down |
| if (!stack.getChannel().closing()) { |
| log.getLogWriter().warning( |
| ExternalStrings.GMS_FAILED_TO_COLLECT_ALL_ACKS_0_FOR_VIEW_1_AFTER_2_MS_MISSING_ACKS_FROM_3_RECEIVED_4_LOCAL_ADDR_5, |
| new Object[] {Integer.valueOf(size), vid, Long.valueOf(view_ack_collection_timeout), missingStr/*, receivedStr, local_addr*/}); |
| /* |
| * GemStoneAddition |
| * suspect members that did not respond. This must be done in |
| * a different thread to avoid lockups in FD_SOCK |
| */ |
| /*new Thread() { |
| @Override // GemStoneAddition |
| public void run() { |
| // bug #49448 fixed here by not synchronizing on ack-collector |
| // while suspect verification processing is going on |
| List suspects = ack_collector.getMissingAcks(); |
| suspect(suspects); |
| }}.start();*/ |
| checkFailedToAckMembers(new_view, ack_collector.getMissingAcks()); |
| } |
| } |
| } |
| finally { |
| // GemStoneAddition - now we can fully reset the collector so that |
| // it is ready to receive view-acks from join responses subsequent |
| // to this view. Prior to this change, these acks were lost when |
| // the view was reset in this method |
| synchronized(ack_collector) { |
| ack_collector.fullyReset(); |
| } |
| synchronized(prepare_collector) { |
| prepare_collector.fullyReset(); // GemStoneAddition - fix for #43886 |
| } |
| // GemStoneAddition - we don't need this anymore |
| this.previousPreparedView = null; |
| } |
| } |
| |
| |
| |
| /** |
| * GemStoneAddition - send a prepare to all members and require a response |
| * |
| * @param new_view |
| * @param mcast |
| * @param newMbrs |
| * @return true if the view can be transmitted |
| */ |
| boolean prepareView(View new_view, boolean mcast, List newMbrs) { |
| |
| // compute the failed weight and form a set of non-admin failures |
| Set<IpAddress> failures = new HashSet(new_view.getSuspectedMembers()); |
| int failedWeight = processFailuresAndGetWeight(this.view, this.leader, failures); |
| |
| // log.getLogWriterI18n().info(JGroupsStrings.DEBUG, "[partition detection] these cache processes failed: " + failures); |
| |
| // this.preparedView may have been set before this vm decided to send out |
| // a view. If it was, we use it to modify the current view and try again |
| if (!processPreparedView(this.preparedView, new_view, newMbrs, false)) { |
| if (log.getLogWriter().fineEnabled()) { |
| log.getLogWriter().fine("processPreparedView failed; aborting view " + new_view.getVid()); |
| } |
| return false; |
| } |
| |
| |
| List<IpAddress> failedToAck = sendPrepareForViewChange(new_view, newMbrs, false); |
| |
| // this.preparedView may have been set by a PREPARE_VIEW_ACK response |
| // if another member received a PREPARE_VIEW with a different view than |
| // we just tried to prepare |
| if (!processPreparedView(this.preparedView, new_view, newMbrs, false)) { |
| if (log.getLogWriter().fineEnabled()) { |
| log.getLogWriter().fine("processPreparedView failed; aborting view " + new_view.getVid()); |
| } |
| return false; |
| } |
| |
| // [bruce] prior to 7.0.1 we did not perform suspect processing on members that fail to |
| // respond to a new view. In 7.0.1 we require timely responses to view changes |
| // in order to detect loss of quorum. |
| // In 8.0 this is changed to not do suspect processing if the coordinator is shutting down |
| if (failedToAck.size() > 0 && !stack.getChannel().closing()) { |
| if ( !checkFailedToAckMembers(new_view, failedToAck) ) { |
| return false; |
| } |
| } |
| if (this.joined) { // bug #44491 - don't declare a network partition if we're still starting up |
| // okay, others have acked that they're ready for the view, so we know |
| // at this point who was around after loss of the members in the failures set |
| int oldWeight = 0; |
| // Map<IpAddress, Integer> oldWeights = new HashMap<IpAddress, Integer>(); |
| boolean leadProcessed = false; |
| boolean displayWeights = (failedWeight > 0) && !Boolean.getBoolean("gemfire.hide-member-weights"); |
| StringBuffer sb = displayWeights? new StringBuffer(1000) : null; |
| for (Iterator it = this.view.getMembers().iterator(); it.hasNext(); ) { |
| IpAddress a = (IpAddress)it.next(); |
| int thisWeight = a.getMemberWeight(); |
| if (a.getVmKind() == 10 /* NORMAL_DM_KIND */) { |
| thisWeight += 10; |
| if (!leadProcessed) { |
| thisWeight += 5; |
| leadProcessed = true; |
| } |
| } else if (a.preferredForCoordinator()) { |
| thisWeight += 3; |
| } |
| oldWeight += thisWeight; |
| if (displayWeights) { |
| sb.append("\n") |
| .append(ExternalStrings.GMS_MEMBER_0_HAS_WEIGHT_1 |
| .toLocalizedString(a, Integer.valueOf(thisWeight))); |
| } |
| } |
| if (sb != null) { |
| String str = sb.toString(); |
| if (str.length() > 0) { |
| log.getLogWriter().info(GFStringIdImpl.LITERAL, str); |
| } |
| } |
| int lossThreshold = (int)Math.round((oldWeight * this.partitionThreshold) / 100.0); |
| |
| if (failedWeight > 0) { |
| log.getLogWriter().info(ExternalStrings.NetworkPartitionDetectionWeightCalculation, |
| new Object[] {Integer.valueOf(oldWeight), Integer.valueOf(failedWeight), |
| Integer.valueOf(this.partitionThreshold), |
| lossThreshold}); |
| |
| if (failedWeight >= lossThreshold) { |
| if (this.splitBrainDetectionEnabled) { |
| this.networkPartitionDetected = true; |
| sendNetworkPartitionWarning(new_view.getMembers()); |
| quorumLost(failures, this.view); |
| forceDisconnect(new Event(Event.EXIT, stack.gfBasicFunctions.getForcedDisconnectException( |
| ExternalStrings.GMS_EXITING_DUE_TO_POSSIBLE_NETWORK_PARTITION_EVENT_DUE_TO_LOSS_OF_MEMBER_0 |
| .toLocalizedString(failures.size(), failures)))); |
| return false; |
| } // else quorumLost will be invoked when the view is installed |
| } |
| } |
| } |
| |
| if (log.getLogWriter().fineEnabled()) { |
| log.getLogWriter().fine("done successfully preparing view " + new_view.getVid()); |
| } |
| return true; |
| } |
| |
| private boolean checkFailedToAckMembers(final View new_view, final List<IpAddress> failedToAck) { |
| |
| if (failedToAck.size() == 0) { |
| return true; |
| } |
| |
| if (log.getLogWriter().infoEnabled()) { |
| log.getLogWriter().info(ExternalStrings.CHECKING_UNRESPONSIVE_MEMBERS, |
| new Object[]{failedToAck}); |
| } |
| final FD_SOCK fdSock = (FD_SOCK)this.stack.findProtocol("FD_SOCK"); |
| if (this.stack.getChannel().closing() && fdSock == null) { |
| if (log.getLogWriter().fineEnabled()) { |
| log.getLogWriter().fine("FD_SOCK not found for liveness checks - aborting view " + new_view.getVid()); |
| } |
| return false; // bug #44786: cannot prepare a view if the stack has been dismantled |
| } |
| assert fdSock != null; |
| ExecutorService es = Executors.newFixedThreadPool(failedToAck.size(), new ThreadFactory() { |
| private final AtomicInteger threadCount = new AtomicInteger(1); |
| @Override |
| public Thread newThread(Runnable r) { |
| Thread th = new Thread(GemFireTracer.GROUP, |
| r, |
| FAILED_TO_ACK_MEMBER_VERIFY_THREAD + "-" + threadCount.getAndIncrement()); |
| return th; |
| } |
| }); |
| ArrayList<Callable<IpAddress>> al = new ArrayList<Callable<IpAddress>>(); |
| for (final Iterator<IpAddress> it=failedToAck.iterator(); it.hasNext(); ) { |
| final IpAddress failedAddress = it.next(); |
| al.add(new Callable<IpAddress>() { |
| IpAddress sockAddress = fdSock.fetchPingAddress(failedAddress, 0); |
| public IpAddress call() throws Exception { |
| log.getLogWriter().info(ExternalStrings.SUSPECTING_MEMBER_WHICH_DIDNT_ACK, new Object[]{failedAddress.toString()}); |
| if (sockAddress == null) { |
| if (log.getLogWriter().fineEnabled()) { |
| log.getLogWriter().fine("unable to find ping address for " + failedAddress |
| + " - using direct port to verify if it's there"); |
| } |
| // fdSock can't perform the verification because it has no FD_SOCK |
| // address for the member |
| // If we can connect to the member's cache socket, then we know its machine is |
| // up. It may be dead and its socket port reused, but we know there isn't a |
| // network partition going on. |
| sockAddress = new IpAddress(failedAddress.getIpAddress(), failedAddress.getDirectPort()); |
| if (sockAddress.getPort() != 0 && |
| fdSock.checkSuspect(failedAddress, sockAddress, ExternalStrings.MEMBER_DID_NOT_ACKNOWLEDGE_VIEW.toLocalizedString(), false, false)) { |
| if (log.getLogWriter().infoEnabled()) { |
| log.getLogWriter().info(ExternalStrings.ABLE_TO_CONNECT_TO_DC_PORT, new Object[]{failedAddress, sockAddress.getPort()}); |
| } |
| return failedAddress;//now we remove below using feature |
| } |
| } else if (fdSock.checkSuspect(failedAddress, sockAddress, ExternalStrings.MEMBER_DID_NOT_ACKNOWLEDGE_VIEW.toLocalizedString(), true, false)) { |
| if (log.getLogWriter().infoEnabled()) { |
| log.getLogWriter().info(ExternalStrings.ABLE_TO_CONNECT_TO_FD_PORT, new Object[]{failedAddress, sockAddress.getPort()}); |
| } |
| return failedAddress;//now we remove below using feature |
| } |
| return null; |
| } |
| } |
| ); |
| } |
| try { |
| List<java.util.concurrent.Future<IpAddress>> futures = es.invokeAll(al); |
| for(java.util.concurrent.Future<IpAddress> future : futures){ |
| try { |
| IpAddress ipAddr = future.get(view_ack_collection_timeout + 100, TimeUnit.MILLISECONDS); |
| if(ipAddr != null) { |
| failedToAck.remove(ipAddr); |
| } |
| } catch (ExecutionException e) { |
| } catch (java.util.concurrent.TimeoutException e) { |
| } |
| } |
| } catch (InterruptedException e) { |
| } |
| |
| try{ |
| es.shutdown(); |
| es.awaitTermination(view_ack_collection_timeout , TimeUnit.MILLISECONDS); |
| }catch(Exception ex) { |
| //ignore |
| } |
| // we could, at this point, also see if there are any ip addresses in the |
| // set that ack'd the message that are also in the failed set. That would |
| // tell us if the route to that NIC is still viable and we could remove the |
| // entry from the failedToAck set [bruce 2011] |
| if (!failedToAck.isEmpty()) { |
| // log.getLogWriterI18n().info(JGroupsStrings.DEBUG, "these cache processes failed to ack a view preparation message: " + failedToAck); |
| // abandon this view and cast a new view with the failed ackers added to the suspects list |
| failedToAck.addAll(new_view.getSuspectedMembers()); |
| if (log.getLogWriter().fineEnabled()) { |
| log.getLogWriter().fine("invoking handleLeave with " + failedToAck + ". My membership is " + this.members.getMembers()); |
| } |
| this.impl.handleLeave(failedToAck, true, |
| Collections.singletonList(ExternalStrings.MEMBER_DID_NOT_ACKNOWLEDGE_VIEW.toLocalizedString()), |
| true); |
| if (log.getLogWriter().fineEnabled()) { |
| log.getLogWriter().fine("done casting view " + new_view.getVid()); |
| } |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * process the given set of failures against the given last known view and report the |
| * amount of lost weight. This also removes any admin members from the collection of failures. |
| * |
| * @param lastView the last View that was completely installed |
| * @param leader the leader process in lastView |
| * @param failures the set of failures in the view being created |
| * @return the amount of weight lost in the failures collection |
| */ |
| public static int processFailuresAndGetWeight(View lastView, Address leader, Collection failures) { |
| int failedWeight = 0; |
| for (Iterator<IpAddress> it=failures.iterator(); it.hasNext(); ) { |
| IpAddress addr = it.next(); |
| if (!lastView.getMembers().contains(addr)) { // bug #37342: added and removed since the last installed view |
| continue; |
| } |
| failedWeight += addr.getMemberWeight(); |
| if (addr.getVmKind() != 10 /* NORMAL_DM_KIND */) { |
| if (addr.preferredForCoordinator()) { |
| failedWeight += 3; |
| } |
| it.remove(); |
| } else { |
| failedWeight += 10; |
| } |
| } |
| if (leader != null && failures.contains(leader)) { |
| failedWeight += 5; |
| } |
| return failedWeight; |
| } |
| |
| // GemStoneAddition - notify of loss of quorum |
| private void quorumLost(final Set failures, final View currentView) { |
| Thread notificationThread = new Thread(GemFireTracer.GROUP, "Quorum Lost Notification") { |
| public void run() { |
| List remaining = new ArrayList(currentView.getMembers().size()); |
| remaining.addAll(currentView.getMembers()); |
| remaining.removeAll(failures); |
| try { |
| stack.gfPeerFunctions.quorumLost(failures, remaining); |
| } catch (RuntimeException e) { |
| if (e.getClass().getSimpleName().equals("CancelException")) { |
| // bug #47403: ignore this exception - cache closed before notification went through |
| } else { |
| throw e; |
| } |
| } |
| } |
| }; |
| notificationThread.setDaemon(true); |
| notificationThread.start(); |
| } |
| |
| /** |
| * GemStoneAddition - process a view received in a PREPARE_VIEW message from another |
| * coordinator, or from a PREPARE_VIEW_ACK response to a message |
| * that this node transmitted to another member |
| * @param pView the view being prepared |
| * @param new_view the view I am trying to cast |
| * @param newMbrs the members list in new_view |
| * @param mcast whether the view can be multicast |
| * @return true if new_view can be installed, false if a different view was installed |
| */ |
| boolean processPreparedView(View pView, View new_view, List newMbrs, boolean mcast) { |
| View prevView = this.previousPreparedView; |
| if (pView != null // different view has been prepared |
| && !pView.getCreator().equals(this.local_addr) // I did not create it |
| && pView.getVid().compare(this.view.getVid()) > 0 // It's newer than my current view |
| && (prevView == null || pView.getVid().compare(prevView.getVid()) > 0) ) { // I haven't processed a newer prepared view |
| |
| this.previousPreparedView = pView; |
| |
| Vector newMembersFromPView = new Vector(); |
| Vector newFailuresFromPView = new Vector(); |
| Vector newLeavingFromPView = new Vector(); |
| synchronized(members) { |
| // someone else has already prepared a new view |
| if (log.getLogWriter().infoEnabled()) { |
| log.getLogWriter().info(ExternalStrings.RECEIVED_PREVIOUSLY_PREPARED_VIEW, pView); |
| } |
| Set<Address> allFailures = new HashSet(this.leaving); |
| for (Iterator it=pView.getMembers().iterator(); it.hasNext(); ) { |
| Address a = (Address)it.next(); |
| if (!allFailures.contains(a)) { |
| // in the other view and not in my failure list - new member |
| newMembersFromPView.add(a); |
| } |
| } |
| for (Iterator it=pView.getSuspectedMembers().iterator(); it.hasNext(); ) { |
| Address a = (Address)it.next(); |
| if (!allFailures.contains(a)) { |
| // failed in other view but not in mine |
| newFailuresFromPView.add(a); |
| } |
| } |
| for (Iterator it=newMbrs.iterator(); it.hasNext(); ) { |
| Address a = (Address)it.next(); |
| if (!pView.containsMember(a)) { |
| newLeavingFromPView.add(a); |
| } |
| } |
| } |
| if (!newMembersFromPView.isEmpty() |
| || !newFailuresFromPView.isEmpty() |
| || !newLeavingFromPView.isEmpty()) { |
| // we need the full list of failures but the others are just increments on |
| // the leaving/joining collections in tmp_mbrs |
| newFailuresFromPView.addAll(new_view.getSuspectedMembers()); |
| synchronized(prepare_collector) { |
| prepare_collector.fullyReset(); // GemStoneAddition - fix for #43886 |
| } |
| castViewChange(newMembersFromPView, newLeavingFromPView, newFailuresFromPView, mcast); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * GemStoneAddition - send prepare-for-view-change message and wait for |
| * responses. Return a list of addresses that didn't respond |
| */ |
| List<IpAddress> sendPrepareForViewChange(View newView, Collection<IpAddress> mbrs, boolean mcast) { |
| int size=-1; |
| ViewId vid = newView.getVid(); |
| |
| synchronized (prepare_collector) { // GemStoneAddition bug34505 |
| prepare_collector.reset(vid, mbrs); |
| size=prepare_collector.size(); |
| } |
| boolean timedout=false; |
| GmsHeader hdr=new GmsHeader(GmsHeader.PREPARE_FOR_VIEW); |
| // GemStoneAddition - when PWNing an existing view, we have to unicast |
| // because NAKACK will reject a mcast message from a non-member |
| if (mcast) { |
| Message view_change_msg=new Message(); // bcast to all members |
| view_change_msg.setObject(newView); |
| view_change_msg.isHighPriority = true; |
| view_change_msg.putHeader(name, hdr); |
| passDown(new Event(Event.MSG, view_change_msg)); |
| } |
| else { |
| for (IpAddress dest: mbrs) { |
| if (dest.equals(this.local_addr)) { |
| synchronized(prepare_collector) { |
| prepare_collector.ack(dest, null); |
| } |
| } else { |
| Message msg = new Message(); |
| msg.isHighPriority = true; |
| msg.putHeader(name, hdr); |
| msg.setObject(newView); |
| msg.setDest(dest); |
| // if (trace) |
| // log.trace("sending PREPARE_FOR_VIEW to " + dest); |
| passDown(new Event(Event.MSG, msg)); |
| } |
| } |
| } |
| try { |
| // log.getLogWriterI18n().info(JGroupsStrings.DEBUG, "DEBUG: before waiting for prepare-acks, collector state is " + prepare_collector); |
| prepare_collector.waitForAllAcks(view_ack_collection_timeout); |
| // log.getLogWriterI18n().info(JGroupsStrings.DEBUG, "DEBUG: after waiting for prepare-acks, collector state is " + prepare_collector); |
| } |
| catch(TimeoutException e) { |
| // timeout processing is handled below, along with members declared dead during view preparation |
| // see bug #47295 |
| timedout=true; |
| } |
| //log.getLogWriterI18n().info(JGroupsStrings.DEBUG, "received acks from " + prepare_collector.getReceived()); |
| List missing; |
| String missingStr; |
| String receivedStr = null; |
| boolean logMissing = false; |
| synchronized (prepare_collector) { |
| missing = prepare_collector.getMissingAcks(); |
| missing.addAll(prepare_collector.getSuspectedMembers()); |
| missing.removeAll(newView.getSuspectedMembers()); // don't reprocess members that are already kicked out |
| missing.remove(this.local_addr); // don't suspect myself - I might be shutting down |
| // bug #44342 - if the member has sent a shutdown message then it's okay |
| for (Iterator<IpAddress> it=missing.iterator(); it.hasNext(); ) { |
| IpAddress addr = it.next(); |
| if (!newView.containsMember(addr) || this.stack.gfPeerFunctions.isShuttingDown(addr)) { |
| it.remove(); |
| } |
| } |
| logMissing = missing.size() > 0; |
| if (logMissing) { |
| // note who has acked while under synchronization |
| receivedStr = prepare_collector.getReceived().toString(); |
| } |
| } |
| if (logMissing && !this.stack.getChannel().closing()) { |
| missingStr = missing.toString(); |
| log.getLogWriter().warning( |
| ExternalStrings.GMS_FAILED_TO_COLLECT_ALL_ACKS_0_FOR_VIEW_PREPARATION_1_AFTER_2_MS_MISSING_ACKS_FROM_3_RECEIVED_4_LOCAL_ADDR_5, |
| new Object[] {Integer.valueOf(size), newView, Long.valueOf(view_ack_collection_timeout), missingStr, receivedStr, local_addr}/*, new Exception("stack trace")*/); |
| } |
| return missing; |
| } |
| |
| |
| void sendNetworkPartitionWarning(Collection<IpAddress> mbrs) { |
| GmsHeader hdr = new GmsHeader(GmsHeader.NETWORK_PARTITION_DETECTED); |
| for (IpAddress dest: mbrs) { |
| Message msg = new Message(); |
| msg.isHighPriority = true; |
| msg.putHeader(name, hdr); |
| msg.setDest(dest); |
| passDown(new Event(Event.MSG, msg)); |
| } |
| try { |
| Thread.sleep(this.leave_timeout); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); // pass the problem to other code |
| } |
| } |
| |
| /** |
| * GemStoneAddition - suspect members that fail to return view acks |
| */ |
| protected void suspect(List suspects) { |
| passDown(new Event(Event.GMS_SUSPECT, suspects)); |
| } |
| |
| |
| /** |
| * GemStoneAddition - tell the ack collectors to stop waiting for the |
| * given address |
| */ |
| public void addSuspectToCollectors(Address addr) { |
| synchronized (ack_collector) { |
| ack_collector.suspect(addr); |
| } |
| synchronized(prepare_collector) { |
| prepare_collector.suspect(addr); |
| } |
| } |
| /** |
| * Sets the new view and sends a VIEW_CHANGE event up and down the stack. If the view is a MergeView (subclass |
| * of View), then digest will be non-null and has to be set before installing the view. |
| */ |
| public void installView(View new_view, Digest digest) { |
| if(digest != null) |
| mergeDigest(digest); |
| installView(new_view); |
| } |
| |
| |
| /** GemStoneAddition - return the weight of the given IDs */ |
| public static int getWeight(Collection ids, Address leader) { |
| int weight = 0; |
| for (Iterator<IpAddress> it=ids.iterator(); it.hasNext(); ) { |
| IpAddress addr = it.next(); |
| int thisWeight = addr.getMemberWeight(); |
| if (addr.getVmKind() == 10 /* NORMAL_DM_KIND */) { |
| thisWeight += 10; |
| if (leader != null && addr.equals(leader)) { |
| thisWeight += 5; |
| } |
| } else if (addr.preferredForCoordinator()) { |
| thisWeight += 3; |
| } |
| weight += thisWeight; |
| } |
| return weight; |
| } |
| |
| /** |
| * Sets the new view and sends a VIEW_CHANGE event up and down the stack. |
| */ |
| public void installView(View new_view) { |
| Address coord; |
| int rc; |
| ViewId vid=new_view.getVid(); |
| Vector mbrs=new_view.getMembers(); |
| |
| if (networkPartitionDetected) { |
| return; // GemStoneAddition - do not install the view if we've decided to exit |
| } |
| this.preparedView = null; |
| |
| if(log.isDebugEnabled()) log.debug("[local_addr=" + local_addr + "] view is " + new_view); |
| //log.getLogWriter().info("installing view " + new_view); // debugging |
| if(stats) { |
| num_views++; |
| prev_views.add(new_view); |
| } |
| |
| // Discards view with id lower than our own. Will be installed without check if first view |
| if(view_id != null) { |
| rc=vid.compareTo(view_id); |
| if(rc <= 0) { |
| if(log.isTraceEnabled() && rc < 0) // only scream if view is smaller, silently discard same views |
| log.trace("[" + local_addr + "] received view < current view;" + |
| " discarding it (current vid: " + view_id + ", new vid: " + vid + ')'); |
| return; |
| } |
| } |
| |
| ltime=Math.max(vid.getId(), ltime); // compute Lamport logical time |
| |
| /* Check for self-inclusion: if I'm not part of the new membership, I just discard it. |
| This ensures that messages sent in view V1 are only received by members of V1 */ |
| if(checkSelfInclusion(mbrs) == false) { |
| // only shun if this member was previously part of the group. avoids problem where multiple |
| // members (e.g. X,Y,Z) join {A,B} concurrently, X is joined first, and Y and Z get view |
| // {A,B,X}, which would cause Y and Z to be shunned as they are not part of the membership |
| // bela Nov 20 2003 |
| if(shun && local_addr != null && prev_members.contains(local_addr)) { |
| if(warn) |
| log.warn("I (" + local_addr + ") am not a member of view " + new_view + |
| ", shunning myself and leaving the group (prev_members are " + prev_members + |
| ", current view is " + view + ")"); |
| if(impl != null) |
| impl.handleExit(); |
| networkPartitionDetected = true; |
| passUp(new Event(Event.EXIT, |
| stack.gfBasicFunctions.getForcedDisconnectException( |
| "This member has been forced out of the distributed system by " + new_view.getCreator() |
| + ". Please consult GemFire logs to find the reason. (GMS shun)"))); |
| } |
| else { |
| if(warn) log.warn("I (" + local_addr + ") am not a member of view " + new_view + "; discarding view"); |
| } |
| return; |
| } |
| |
| Event view_event; |
| synchronized(installViewLock) { |
| synchronized(members) { // serialize access to views |
| |
| if (this.view != null) { |
| int oldWeight = getWeight(this.view.getMembers(), this.leader); |
| int failedWeight = getWeight(new_view.getSuspectedMembers(), this.leader); |
| int lossThreshold = (int)Math.round((oldWeight * this.partitionThreshold) / 100.0); |
| if (failedWeight >= lossThreshold) { |
| log.getLogWriter().info(ExternalStrings.DEBUG, "old membership weight=" + oldWeight + ", loss threshold=" + lossThreshold+" and failed weight=" + failedWeight); |
| // log.getLogWriterI18n().info(JGroupsStrings.DEBUG, "current view="+this.view+"; suspects="+new_view.getSuspectedMembers()); |
| quorumLost(new_view.getSuspectedMembers(), this.view); |
| } |
| } |
| |
| // assign new_view to view_id |
| if(new_view instanceof MergeView) |
| view=new View(new_view.getVid(), new_view.getMembers()); |
| else |
| view=new_view; |
| view_id=vid.copy(); |
| |
| // GemStoneAddition - tracking of leader |
| Address oldLead = this.leader; |
| this.leader = view.getLeadMember(); |
| if (this.leader != null) { |
| // log acquisition of new leader |
| if (oldLead == null || !oldLead.equals(this.leader)) { |
| log.getLogWriter().info(ExternalStrings.GMS_MEMBERSHIP_LEADER_MEMBER_IS_NOW_0, this.leader); |
| } |
| } |
| |
| |
| // Set the membership. Take into account joining members |
| if(mbrs != null && mbrs.size() > 0) { |
| members.set(mbrs); |
| this.coordinator = members.getCoordinator(); // GemStoneAddition bug #44967 |
| tmp_members.set(members); |
| joining.removeAll(mbrs); // remove all members in mbrs from joining |
| // remove all elements from 'leaving' that are not in 'mbrs' |
| leaving.retainAll(mbrs); |
| |
| tmp_members.add(joining); // add members that haven't yet shown up in the membership |
| tmp_members.remove(leaving); // remove members that haven't yet been removed from the membership |
| |
| // add to prev_members |
| for(Iterator it=mbrs.iterator(); it.hasNext();) { |
| Address addr=(Address)it.next(); |
| if(!prev_members.contains(addr)) |
| prev_members.add(addr); |
| } |
| } |
| |
| view_event=new Event(Event.VIEW_CHANGE, new_view.clone()); |
| } |
| |
| coord=determineCoordinator(); |
| // if(coord != null && coord.equals(local_addr) && !(coord.equals(vid.getCoordAddress()))) { |
| // changed on suggestion by yaronr and Nicolas Piedeloupe |
| if(coord != null && coord.equals(local_addr) && !haveCoordinatorRole()) { |
| // GemSToneAddition - pass suspects to coordinator impl |
| Vector suspects; |
| if (haveParticipantRole()) { |
| suspects = ((ParticipantGmsImpl)impl).getSuspects(); |
| } else { |
| suspects = new Vector(); |
| } |
| becomeCoordinator(suspects); |
| coord = local_addr; // GemStoneAddition |
| } |
| else { |
| if(haveCoordinatorRole() && !local_addr.equals(coord)) |
| becomeParticipant(); |
| } |
| // GemStoneAddition - notify concerned parties of the current coordinator |
| if (coord != null) { |
| notifyOfCoordinator(coord); |
| } |
| } |
| // Send VIEW_CHANGE event up and down the stack: |
| // (moved from inside synchronization for bug #52099) |
| if (view_event != null) { |
| passDown(view_event); // needed e.g. by failure detector or UDP |
| passUp(view_event); |
| } |
| } |
| |
| |
| void forceDisconnect(final Event exitEvent) { |
| this.networkPartitionDetected = true; |
| Thread tilt = new Thread(Thread.currentThread().getThreadGroup(), |
| "GMS Network Partition Event") { |
| @Override // GemStoneAddition |
| public void run() { |
| passDown(exitEvent); |
| passUp(exitEvent); |
| } |
| }; |
| tilt.start(); |
| } |
| |
| /** |
| * GemStoneAddition - rewritten for localizing view coordinator in locator |
| * processes, and made public |
| * @return the acting coordinator |
| */ |
| public Address determineCoordinator() { |
| // return members != null && members.size() > 0? (Address)members.elementAt(0) : null; |
| return this.coordinator; |
| } |
| |
| /** |
| * GemStoneAddition - return the lead member for partition detection algorithms |
| */ |
| public Address getLeadMember() { |
| return this.leader; |
| } |
| |
| /** |
| * GemStoneAddition - retreive the partitionThreshold for quorum calculations |
| */ |
| public int getPartitionThreshold() { |
| return this.partitionThreshold; |
| } |
| |
| /** |
| * GemStoneAddition - retrieve the latest view |
| */ |
| public View getLastView() { |
| return this.view; |
| } |
| |
| /** Checks whether the potential_new_coord would be the new coordinator (2nd in line) */ |
| protected boolean wouldBeNewCoordinator(Address potential_new_coord) { |
| // Address new_coord; |
| |
| if(potential_new_coord == null) return false; |
| |
| synchronized(members) { |
| if(members.size() < 2) return false; |
| // new_coord=(Address)members.elementAt(1); // member at 2nd place |
| // return new_coord != null && new_coord.equals(potential_new_coord); |
| return members.wouldBeNewCoordinator(potential_new_coord); |
| } |
| } |
| |
| |
| /** Returns true if local_addr is member of mbrs, else false */ |
| protected boolean checkSelfInclusion(Vector mbrs) { |
| Object mbr; |
| if(mbrs == null) |
| return false; |
| for(int i=0; i < mbrs.size(); i++) { |
| mbr=mbrs.elementAt(i); |
| if(mbr != null && local_addr.equals(mbr)) |
| return true; |
| } |
| return false; |
| } |
| |
| |
| // GemStoneAddition - this method is not used by jgroups |
| // public View makeView(Vector mbrs) { |
| // Address coord=null; |
| // long id=0; |
| // |
| // if(view_id != null) { |
| // coord=view_id.getCoordAddress(); |
| // id=view_id.getId(); |
| // } |
| // return new View(coord, id, mbrs); |
| // } |
| |
| |
| // GemStoneAddition - this method is not used by jgroups |
| // public View makeView(Vector mbrs, ViewId vid) { |
| // Address coord=null; |
| // long id=0; |
| // |
| // if(vid != null) { |
| // coord=vid.getCoordAddress(); |
| // id=vid.getId(); |
| // } |
| // return new View(coord, id, mbrs); |
| // } |
| |
| |
| /** Send down a SET_DIGEST event */ |
| public void setDigest(Digest d) { |
| passDown(new Event(Event.SET_DIGEST, d)); |
| } |
| |
| |
| /** Send down a MERGE_DIGEST event */ |
| public void mergeDigest(Digest d) { |
| passDown(new Event(Event.MERGE_DIGEST, d)); |
| } |
| |
| |
| /** Sends down a GET_DIGEST event and waits for the GET_DIGEST_OK response, or |
| timeout, whichever occurs first */ |
| public Digest getDigest() { |
| Digest ret=null; |
| |
| synchronized(digest_mutex) { |
| digest_promise.reset(); |
| passDown(Event.GET_DIGEST_EVT); |
| try { |
| ret=(Digest)digest_promise.getResultWithTimeout(digest_timeout); |
| } |
| catch(TimeoutException e) { |
| if(log.isErrorEnabled()) log.error(ExternalStrings.GMS_DIGEST_COULD_NOT_BE_FETCHED_FROM_BELOW); |
| } |
| return ret; |
| } |
| } |
| |
| |
| @Override // GemStoneAddition |
| public void up(Event evt) { |
| Object obj; |
| Message msg; |
| GmsHeader hdr; |
| MergeData merge_data; |
| |
| switch(evt.getType()) { |
| |
| case Event.MSG: |
| msg=(Message)evt.getArg(); |
| obj=msg.getHeader(name); |
| if(obj == null || !(obj instanceof GmsHeader)) |
| break; |
| hdr=(GmsHeader)msg.removeHeader(name); |
| switch(hdr.type) { |
| case GmsHeader.JOIN_REQ: |
| if (this.haveCoordinatorRole()) { |
| // GemStoneAddition - partial fix for bugs #41722 and #42009 |
| IpAddress iaddr = (IpAddress)hdr.mbr; |
| if (iaddr.getBirthViewId() >= 0 && this.stack.gfPeerFunctions.isShunnedMemberNoSync(iaddr)) { |
| log.getLogWriter().info( |
| ExternalStrings. COORDGMSIMPL_REJECTING_0_DUE_TO_REUSED_IDENTITY, hdr.mbr); |
| ((CoordGmsImpl)getImpl()).sendJoinResponse(new JoinRsp(JoinRsp.SHUNNED_ADDRESS), hdr.mbr, false); |
| return; |
| } |
| if (members.contains(hdr.mbr)) { |
| IpAddress hmbr = (IpAddress)hdr.mbr; |
| for (Iterator it=members.getMembers().iterator(); it.hasNext(); ) { |
| IpAddress addr = (IpAddress)it.next(); |
| if (addr.equals(hdr.mbr)) { |
| if (addr.getUniqueID() != hmbr.getUniqueID()) { |
| log.getLogWriter().info( |
| ExternalStrings. COORDGMSIMPL_REJECTING_0_DUE_TO_REUSED_IDENTITY, hdr.mbr); |
| ((CoordGmsImpl)getImpl()).sendJoinResponse(new JoinRsp(JoinRsp.SHUNNED_ADDRESS), hdr.mbr, false); |
| return; |
| } else { |
| break; |
| } |
| } |
| } |
| getImpl().handleAlreadyJoined(hdr.mbr); |
| return; |
| } |
| } |
| // GemStoneAddition - bug #50510, 50742 |
| if (hdr.mbr.getVersionOrdinal() > 0 |
| && hdr.mbr.getVersionOrdinal() < JGroupsVersion.CURRENT_ORDINAL) { |
| log.getLogWriter().warning( |
| ExternalStrings.COORD_REJECTING_OLD_MEMBER_BECAUSE_UPGRADE_HAS_BEGUN, |
| new Object[]{hdr.mbr}); |
| ((CoordGmsImpl)getImpl()).sendJoinResponse( |
| new JoinRsp(ExternalStrings.COORD_REJECTING_OLD_MEMBER_BECAUSE_UPGRADE_HAS_BEGUN |
| .toLocalizedString(hdr.mbr)), hdr.mbr, false); |
| return; |
| } |
| view_handler.add(new Request(Request.JOIN, hdr.mbr, false, null)); |
| break; |
| case GmsHeader.JOIN_RSP: |
| // log.getLogWriterI18n().info(JGroupsStrings.DEBUG, "join response message received: " + hdr.join_rsp); |
| impl.handleJoinResponse((JoinRsp)msg.getObject()); |
| break; |
| case GmsHeader.LEAVE_REQ: |
| if(log.isDebugEnabled()) |
| log.debug("received LEAVE_REQ for " + hdr.mbr + " from " + msg.getSrc()); |
| if(hdr.mbr == null) { |
| if(log.isErrorEnabled()) log.error(ExternalStrings.GMS_LEAVE_REQS_MBR_FIELD_IS_NULL); |
| return; |
| } |
| synchronized(ack_collector) { |
| ack_collector.ack(hdr.mbr, null); // GemStoneAddition - don't wait for acks from this member |
| } |
| synchronized(prepare_collector) { |
| prepare_collector.ack(hdr.mbr, null); // GemStoneAddition - don't wait for acks from this member (bug #44342) |
| } |
| view_handler.add(new Request(Request.LEAVE, hdr.mbr, false, null)); |
| break; |
| case GmsHeader.LEAVE_RSP: |
| // GemStoneAddition |
| if (hdr.arg != null) { |
| // do some checking to prevent a message intended for another process |
| // from shutting this one down |
| if (hdr.mbr != null && !hdr.mbr.equals(this.local_addr)) { |
| break; |
| } |
| } |
| impl.handleLeaveResponse(hdr.arg); |
| break; |
| case GmsHeader.REMOVE_REQ: // GemStoneAddition - new REMOVE_REQ request for slow receivers |
| // REMOVE_REQ differs from LEAVE_REQ in that the member is suspect |
| // and will register as a failure in network partition detection |
| // algorithms [bruce] |
| if(hdr.mbr == null) { |
| if(log.isErrorEnabled()) log.error("REMOVE_REQ's mbr field is null"); |
| return; |
| } |
| if (members.contains(msg.getSrc())) { |
| if (msg.getSrc().equals(hdr.mbr) && log.isDebugEnabled()) { |
| log.debug("received REMOVE_REQ for " + hdr.mbr + " from " + msg.getSrc() + |
| (hdr.arg == null? "" : " Reason=" + hdr.arg)); |
| } |
| else { |
| log.getLogWriter().warning( |
| ExternalStrings.GMS_MEMBERSHIP_RECEIVED_REQUEST_TO_REMOVE_0_FROM_1_2, |
| new Object[] { hdr.mbr, msg.getSrc(), (hdr.arg == null? "" : " Reason=" + hdr.arg)}); |
| |
| } |
| view_handler.add(new Request(Request.LEAVE, hdr.mbr, true, null, hdr.arg)); |
| } |
| break; |
| case GmsHeader.VIEW: |
| // send VIEW_ACK to sender of view |
| Address coord=msg.getSrc(); |
| Message view_ack=new Message(coord, null, null); |
| view_ack.isHighPriority = true; |
| View v = msg.getObject(); |
| GmsHeader tmphdr=new GmsHeader(GmsHeader.VIEW_ACK); |
| view_ack.putHeader(name, tmphdr); |
| view_ack.setObject(v); |
| if (this.local_addr.getBirthViewId() < 0) { |
| // unicast can't handle changing view IDs very well |
| view_ack.putHeader(UNICAST.BYPASS_UNICAST, tmphdr); |
| } |
| passDown(new Event(Event.MSG, view_ack)); |
| // GemStoneAddition - perform null check AFTER sending an ack |
| // so we don't stall a coordinator when join_rsp hasn't been |
| // received |
| if(v == null) { |
| if(log.isErrorEnabled()) log.error(ExternalStrings.GMS_VIEW_VIEW__NULL); |
| return; |
| } |
| |
| // ViewId newId = hdr.view.getVid(); GemStoneAddition |
| if (this.impl != null && !(this.impl instanceof ClientGmsImpl) |
| && v.getVid().compareTo(view_id) > 0 |
| && !this.stack.getChannel().closing() /* GemStoneAddition - fix for bug #42969*/) { |
| log.getLogWriter().info(ExternalStrings.GMS_MEMBERSHIP_RECEIVED_NEW_VIEW__0, v); |
| impl.handleViewChange(v, v.getMessageDigest()); |
| } |
| break; |
| |
| case GmsHeader.VIEW_ACK: |
| Object sender=msg.getSrc(); |
| if (trace) // GemStoneAddition - debug 34750 |
| log.trace("Received VIEW_ACK from " + sender); |
| v = msg.getObject(); // GemStoneAddition - ack the correct view |
| ViewId vid = v==null? null : v.getVid(); |
| synchronized (ack_collector) { // GemStoneAddition bug34505 |
| ack_collector.ack(sender, vid); |
| } |
| return; // don't pass further up |
| |
| |
| case GmsHeader.PREPARE_FOR_VIEW: // GemStoneAddition |
| v = msg.getObject(); |
| if (trace) |
| log.trace("Received PREPARE_FOR_VIEW from " + msg.getSrc()+" for view " + v); |
| { |
| GmsHeader responseHeader = new GmsHeader(GmsHeader.PREPARE_FOR_VIEW_ACK); |
| View responseView = null; |
| if (!msg.getSrc().equals(this.local_addr)) { |
| if (this.preparedView != null && !v.getCreator().equals(this.preparedView.getCreator())) { |
| // already have a prepared view - don't accept this one |
| responseView = this.preparedView; |
| this.preparedView = null; |
| } else { |
| this.preparedView = v; |
| } |
| } |
| Message m = new Message(true); |
| m.putHeader(name, responseHeader); |
| m.setDest(msg.getSrc()); |
| m.setObject(responseView); |
| passDown(new Event(Event.MSG, m)); |
| return; |
| } |
| |
| case GmsHeader.PREPARE_FOR_VIEW_ACK: // GemStoneAddition - two phase views |
| { |
| sender=msg.getSrc(); |
| v = (View)msg.getObject(); |
| if (trace) { |
| String preparedViewString = v == null? "" : |
| " with conflicting view " + v; |
| log.trace("Received PREPARE_FOR_VIEW_ACK from " + sender + preparedViewString); |
| } |
| vid = null; |
| if (v != null) { |
| // the sender had a conflicting view in preparation |
| this.preparedView = v; |
| vid = v.getVid(); |
| } |
| synchronized (prepare_collector) { |
| prepare_collector.ack(sender, vid); |
| } |
| return; // don't pass further up |
| } |
| case GmsHeader.NETWORK_PARTITION_DETECTED: // GemStoneAddition |
| { |
| forceDisconnect(new Event(Event.EXIT, stack.gfBasicFunctions.getForcedDisconnectException(ExternalStrings.COORDINATOR_DECLARED_NETWORK_PARTITION_EVENT.toLocalizedString(msg.getSrc())))); |
| return; |
| } |
| // GemStoneAddition - explicit view request |
| case GmsHeader.GET_VIEW: |
| { |
| View msgView = this.view; |
| if (msgView != null) { |
| Message viewRsp = new Message(); |
| JoinRsp vrsp = new JoinRsp(msgView, getDigest()); |
| GmsHeader ghdr = new GmsHeader(GmsHeader.GET_VIEW_RSP); |
| viewRsp.putHeader(name, ghdr); |
| viewRsp.setDest(msg.getSrc()); |
| viewRsp.setObject(vrsp); |
| passDown(new Event(Event.MSG, viewRsp)); |
| log.getLogWriter().info(ExternalStrings.DEBUG, "Sending membership view to " + viewRsp.getDest() + ": " + msgView); |
| } |
| break; |
| } |
| // GemStoneAddition - response to view request |
| case GmsHeader.GET_VIEW_RSP: |
| { |
| impl.handleGetViewResponse((JoinRsp)msg.getObject()); |
| break; |
| } |
| case GmsHeader.MERGE_REQ: |
| impl.handleMergeRequest(msg.getSrc(), hdr.merge_id); |
| break; |
| |
| case GmsHeader.MERGE_RSP: |
| View theView = msg.getObject(); |
| merge_data=new MergeData(msg.getSrc(), theView, theView.getMessageDigest()); |
| merge_data.merge_rejected=hdr.merge_rejected; |
| impl.handleMergeResponse(merge_data, hdr.merge_id); |
| break; |
| |
| case GmsHeader.INSTALL_MERGE_VIEW: |
| theView = msg.getObject(); |
| impl.handleMergeView(new MergeData(msg.getSrc(), theView, theView.getMessageDigest()), hdr.merge_id); |
| break; |
| |
| case GmsHeader.CANCEL_MERGE: |
| impl.handleMergeCancelled(hdr.merge_id); |
| break; |
| |
| default: |
| if(log.isErrorEnabled()) log.error(ExternalStrings.GMS_GMSHEADER_WITH_TYPE_0__NOT_KNOWN, hdr.type); |
| } |
| return; // don't pass up |
| |
| case Event.CONNECT_OK: // sent by someone else, but WE are responsible for sending this ! |
| case Event.DISCONNECT_OK: // dito (e.g. sent by TP layer). Don't send up the stack |
| return; |
| |
| |
| case Event.SET_LOCAL_ADDRESS: |
| local_addr=(Address)evt.getArg(); |
| // GemStoneAddition - the address must hold the split-brain enabled flag |
| if (local_addr instanceof IpAddress) { |
| ((IpAddress)local_addr).splitBrainEnabled(this.splitBrainDetectionEnabled); |
| ((IpAddress)local_addr).setMemberWeight(this.memberWeight); |
| } |
| notifyOfLocalAddress(local_addr); |
| if(print_local_addr) { |
| System.out.println("\n-------------------------------------------------------\n" + |
| "GMS: address is " + local_addr + |
| "\n-------------------------------------------------------"); |
| } |
| break; // pass up |
| |
| case Event.SUSPECT: |
| SuspectMember sm = (SuspectMember)evt.getArg(); // GemStoneAddition |
| Address suspected=sm.suspectedMember; |
| if (!members.containsExt(suspected)) { |
| break; // not a member so don't process it |
| } |
| view_handler.add(new Request(Request.LEAVE, suspected, true, null, "did not respond to are-you-dead messages")); |
| addSuspectToCollectors(suspected); |
| break; // pass up |
| |
| case Event.FD_SOCK_MEMBER_LEFT_NORMALLY: // GemStoneAddition - pretty much the same as SUSPECT |
| Address addr = (Address)evt.getArg(); |
| view_handler.add(new Request(Request.LEAVE, addr, false, null, "closed connection to FD_SOCK watcher")); |
| addSuspectToCollectors(addr); |
| break; // pass up |
| |
| case Event.UNSUSPECT: |
| suspected = (Address)evt.getArg(); |
| impl.unsuspect(suspected); |
| // GemStoneAddition - do not wait for view acknowledgement |
| // from a member that has proved it is alive. This ensures |
| // that we don't wait inordinately long for a member that |
| // might just be extremely busy. |
| addSuspectToCollectors(suspected); |
| return; // discard |
| |
| case Event.MERGE: |
| view_handler.add(new Request(Request.MERGE, null, false, (Vector)evt.getArg())); |
| return; // don't pass up |
| |
| // GemStoneAddition - passed up from TCPGOSSIP if none of |
| // the locators has a distributed system |
| case Event.ENABLE_INITIAL_COORDINATOR: |
| disable_initial_coord = false; |
| return; |
| |
| // GemStoneAddiiton - network partitioning detection |
| case Event.FLOATING_COORDINATOR_DISABLED: |
| boolean warning = true; |
| if (!floatingCoordinatorDisabled) { |
| floatingCoordinatorDisabled = true; |
| log.getLogWriter().info( |
| ExternalStrings.GMS_LOCATOR_HAS_DISABLED_FLOATING_MEMBERSHIP_COORDINATION); |
| } else { |
| warning = false; |
| } |
| if (!stack.gfPeerFunctions.hasLocator()) { |
| if (warning) { |
| log.getLogWriter().fine("This member of the distributed system will only be a coordinator if there are no locators available"); |
| } |
| ((IpAddress)local_addr).shouldntBeCoordinator(true); |
| Event newaddr = new Event(Event.SET_LOCAL_ADDRESS, |
| this.local_addr); |
| passUp(newaddr); |
| passDown(newaddr); |
| } |
| else if (warning && log.getLogWriter().fineEnabled()) { |
| log.getLogWriter().fine( |
| "This VM hosts a locator and is preferred as membership coordinator."); |
| } |
| return; |
| |
| case Event.ENABLE_NETWORK_PARTITION_DETECTION: |
| this.splitBrainDetectionEnabled = true; |
| this.stack.gfPeerFunctions.enableNetworkPartitionDetection(); |
| return; |
| |
| // GemStoneAddition - copied from receiveUpEvent for removal of threading model |
| case Event.GET_DIGEST_OK: |
| digest_promise.setResult(evt.getArg()); |
| return; // don't pass further up |
| |
| } |
| |
| if(impl.handleUpEvent(evt)) |
| passUp(evt); |
| } |
| |
| /** |
| This method is overridden to avoid hanging on getDigest(): when a JOIN is received, the coordinator needs |
| to retrieve the digest from the NAKACK layer. It therefore sends down a GET_DIGEST event, to which the NAKACK layer |
| responds with a GET_DIGEST_OK event.<p> |
| However, the GET_DIGEST_OK event will not be processed because the thread handling the JOIN request won't process |
| the GET_DIGEST_OK event until the JOIN event returns. The receiveUpEvent() method is executed by the up-handler |
| thread of the lower protocol and therefore can handle the event. All we do here is unblock the mutex on which |
| JOIN is waiting, allowing JOIN to return with a valid digest. The GET_DIGEST_OK event is then discarded, because |
| it won't be processed twice. |
| */ |
| // @Override // GemStoneAddition |
| // public void receiveUpEvent(Event evt) { |
| // switch(evt.getType()) { |
| // case Event.GET_DIGEST_OK: |
| // digest_promise.setResult(evt.getArg()); |
| // return; // don't pass further up |
| // } |
| // super.receiveUpEvent(evt); |
| // } |
| |
| |
| @Override // GemStoneAddition |
| public void down(Event evt) { |
| switch(evt.getType()) { |
| |
| case Event.CONNECT: |
| RuntimeException joinException = null; // GemStoneAddition |
| passDown(evt); |
| if(local_addr == null) |
| if(log.isFatalEnabled()) log.fatal("[CONNECT] local_addr is null"); |
| joined = false; // GemStoneAddition |
| try { |
| joined = impl.join(local_addr); |
| } |
| catch (ShunnedAddressException e) { // GemStoneAddition |
| joinException = e; |
| } |
| catch (RuntimeException e) { |
| String exClass = e.getClass().getSimpleName(); |
| if (exClass.equals("SystemConnectException") |
| || exClass.equals("AuthenticationFailedException") |
| || exClass.equals("GemFireConfigException")) { // GemStoneAddition |
| joinException = e; |
| } else { |
| throw e; |
| } |
| } |
| if (trace) |
| log.trace("GMS join returned " + joined); |
| if (joined) { |
| // GemStoneAddition - return status from impl |
| Event OK = new Event(Event.CONNECT_OK); |
| passUp(OK); |
| // GemStoneAddition - pass down as well, so lower protocols know we're connected |
| passDown(OK); |
| } |
| else { |
| if (joinException == null) { // GemStoneAddition |
| joinException = stack.gfBasicFunctions.getSystemConnectException("Attempt to connect to distributed system timed out"); |
| } |
| if (log.getLogWriter().fineEnabled()) { |
| log.getLogWriter().fine("Startup is throwing " + joinException); |
| } |
| passUp(new Event(Event.EXIT, joinException)); |
| } |
| if (trace) |
| log.trace("GMS connect completed"); |
| return; // don't pass down: was already passed down |
| |
| case Event.DISCONNECT: |
| Event disconnecting = new Event(Event.DISCONNECTING); // GemStoneAddition - starting to disconnect |
| passUp(disconnecting); |
| passDown(disconnecting); |
| impl.leave((Address)evt.getArg()); |
| this.disconnected = true; |
| passUp(new Event(Event.DISCONNECT_OK)); |
| // bug #44786 - GemFire does not issue connect() after disconnecting a stack. Nulling out the view |
| // variable causes an NPE in the view casting thread if a view is being prepared during shutdown and |
| // can cause NPEs in other places that do not have null checks for this variable |
| //initState(); // in case connect() is called again |
| break; // pass down |
| } |
| |
| // boolean handled = impl.handleDownEvent(evt); GemStoneAddition - this does nothing but slow things down a bit |
| // if(handled) |
| passDown(evt); |
| } |
| |
| |
| /** Setup the Protocol instance according to the configuration string */ |
| @Override // GemStoneAddition |
| public boolean setProperties(Properties props) { |
| String str; |
| |
| super.setProperties(props); |
| str=props.getProperty("shun"); |
| if(str != null) { |
| shun=Boolean.valueOf(str).booleanValue(); |
| props.remove("shun"); |
| } |
| |
| str=props.getProperty("merge_leader"); |
| if(str != null) { |
| merge_leader=Boolean.valueOf(str).booleanValue(); |
| props.remove("merge_leader"); |
| } |
| |
| str=props.getProperty("print_local_addr"); |
| if(str != null) { |
| print_local_addr=Boolean.valueOf(str).booleanValue(); |
| props.remove("print_local_addr"); |
| } |
| |
| str=props.getProperty("join_timeout"); // time to wait for JOIN |
| if(str != null) { |
| join_timeout=Long.parseLong(str); |
| props.remove("join_timeout"); |
| } |
| |
| str=props.getProperty("join_retry_timeout"); // time to wait between JOINs |
| if(str != null) { |
| join_retry_timeout=Long.parseLong(str); |
| props.remove("join_retry_timeout"); |
| } |
| |
| str=props.getProperty("leave_timeout"); // time to wait until coord responds to LEAVE req. |
| if(str != null) { |
| leave_timeout=Long.parseLong(str); |
| props.remove("leave_timeout"); |
| } |
| |
| str=props.getProperty("merge_timeout"); // time to wait for MERGE_RSPS from subgroup coordinators |
| if(str != null) { |
| merge_timeout=Long.parseLong(str); |
| props.remove("merge_timeout"); |
| } |
| |
| str=props.getProperty("digest_timeout"); // time to wait for GET_DIGEST_OK from PBCAST |
| if(str != null) { |
| digest_timeout=Long.parseLong(str); |
| props.remove("digest_timeout"); |
| } |
| |
| str=props.getProperty("view_ack_collection_timeout"); |
| if(str != null) { |
| view_ack_collection_timeout=Long.parseLong(str); |
| props.remove("view_ack_collection_timeout"); |
| } |
| |
| str=props.getProperty("resume_task_timeout"); |
| if(str != null) { |
| resume_task_timeout=Long.parseLong(str); |
| props.remove("resume_task_timeout"); |
| } |
| |
| str=props.getProperty("disable_initial_coord"); |
| if(str != null) { |
| disable_initial_coord=Boolean.valueOf(str).booleanValue(); |
| props.remove("disable_initial_coord"); |
| } |
| // GemStoneAddition - InternalLocator needs this |
| if (Boolean.getBoolean("p2p.enableInitialCoordinator")) { |
| disable_initial_coord = false; |
| } |
| |
| //GemStoneAddition - split-brain detection support |
| str=props.getProperty("split-brain-detection"); |
| if (str != null) { |
| splitBrainDetectionEnabled = Boolean.valueOf(str).booleanValue(); |
| props.remove("split-brain-detection"); |
| } |
| str=props.getProperty("partition-threshold"); |
| int l = 51; |
| if (str != null) { |
| l = Integer.parseInt(str); |
| props.remove("partition-threshold"); |
| } |
| this.partitionThreshold = l; |
| |
| str=props.getProperty("member-weight"); |
| l = 0; |
| if (str != null) { |
| l = Integer.parseInt(str); |
| props.remove("member-weight"); |
| } |
| this.memberWeight = l; |
| |
| str=props.getProperty("handle_concurrent_startup"); |
| if(str != null) { |
| handle_concurrent_startup=Boolean.valueOf(str).booleanValue(); |
| props.remove("handle_concurrent_startup"); |
| } |
| |
| str=props.getProperty("num_prev_mbrs"); |
| if(str != null) { |
| num_prev_mbrs=Integer.parseInt(str); |
| props.remove("num_prev_mbrs"); |
| } |
| |
| if(props.size() > 0) { |
| log.error(ExternalStrings.GMS_GMSSETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props); |
| |
| return false; |
| } |
| return true; |
| } |
| |
| |
| |
| /* ------------------------------- Private Methods --------------------------------- */ |
| |
| void initState() { |
| becomeClient(); |
| view_id=null; |
| view=null; |
| leader = null; // GemStoneAddition |
| } |
| |
| /** |
| * GemStoneAddition - notify interested parties that we've selected an initial |
| * coordinator and joined the group |
| */ |
| protected void notifyOfCoordinator(Address coord) { |
| GossipServer gs = GossipServer.getInstance(); |
| if (gs != null) { |
| gs.setCoordinator(coord); |
| } |
| } |
| |
| /** |
| * GemStoneAddition - notify interested parties that we've selected an initial |
| * coordinator and joined the group |
| */ |
| protected void notifyOfLocalAddress(Address localAddress) { |
| GossipServer gs = GossipServer.getInstance(); |
| if (gs != null) { |
| gs.setLocalAddress(localAddress); |
| } |
| } |
| |
| /* --------------------------- End of Private Methods ------------------------------- */ |
| |
| |
| |
| public static class GmsHeader extends Header implements Streamable { |
| public static final byte JOIN_REQ=1; |
| public static final byte JOIN_RSP=2; |
| public static final byte LEAVE_REQ=3; |
| public static final byte LEAVE_RSP=4; |
| public static final byte VIEW=5; |
| public static final byte MERGE_REQ=6; |
| public static final byte MERGE_RSP=7; |
| public static final byte INSTALL_MERGE_VIEW=8; |
| public static final byte CANCEL_MERGE=9; |
| public static final byte VIEW_ACK=10; |
| public static final byte GET_VIEW=11; // GemStoneAddition |
| public static final byte GET_VIEW_RSP=12; // GemStoneAddition |
| public static final byte REMOVE_REQ=13; // GemStoneAddition - member removal req |
| public static final byte PREPARE_FOR_VIEW=14; // GemStoneAddition - network partition detection |
| public static final byte PREPARE_FOR_VIEW_ACK=15; // GemStoneAddition - network partition detection |
| public static final byte NETWORK_PARTITION_DETECTED=16; // GemStoneAddition - network partition detection |
| |
| byte type=0; |
| // private View view=null; // used when type=VIEW or MERGE_RSP or INSTALL_MERGE_VIEW |
| Address mbr=null; // used when type=JOIN_REQ or LEAVE_REQ |
| // JoinRsp join_rsp=null; // used when type=JOIN_RSP |
| // Digest my_digest=null; // used when type=MERGE_RSP or INSTALL_MERGE_VIEW |
| ViewId merge_id=null; // used when type=MERGE_REQ or MERGE_RSP or INSTALL_MERGE_VIEW or CANCEL_MERGE |
| boolean merge_rejected=false; // used when type=MERGE_RSP |
| String arg = ""; // GemStoneAddition |
| boolean forcedOut; // GemStoneAddition for LEAVE_RSP after REMOVE_REQ |
| |
| |
| public GmsHeader() { |
| } // used for Externalization |
| |
| public GmsHeader(byte type) { |
| this.type=type; |
| } |
| |
| |
| /** Used for VIEW header */ |
| // public GmsHeader(byte type, View view) { |
| // this.type=type; |
| // this.view=view; |
| // } |
| |
| |
| /** Used for JOIN_REQ or LEAVE_REQ header */ |
| public GmsHeader(byte type, Address mbr) { |
| this.type=type; |
| this.mbr=mbr; |
| } |
| |
| /** GemStoneAddition - for REMOVE_REQ */ |
| public GmsHeader(byte type, Address mbr, String reason) { |
| this.type=type; |
| this.mbr=mbr; |
| this.arg = reason; |
| } |
| |
| /** GemStoneAddition for LEAVE_RSP sent from a REMOVE_REQ */ |
| public GmsHeader(byte type, boolean forcedOut, String reason, Address mbr) { |
| this.type = type; |
| this.forcedOut = forcedOut; |
| this.arg = reason; |
| this.mbr = mbr; |
| } |
| |
| // /** Used for JOIN_RSP header */ |
| // public GmsHeader(byte type, JoinRsp join_rsp) { |
| // this.type=type; |
| // this.join_rsp=join_rsp; |
| // } |
| |
| public byte getType() { |
| return type; |
| } |
| |
| public Address getMember() { |
| return mbr; |
| } |
| |
| public String getArg() { // GemStoneAddition |
| return this.arg; |
| } |
| |
| // GemStoneAddition |
| // public View getView() { |
| // return view; |
| // } |
| |
| @Override // GemStoneAddition |
| public String toString() { |
| StringBuffer sb=new StringBuffer("GmsHeader"); |
| sb.append('[' + type2String(type) + ']'); |
| switch(type) { |
| case JOIN_REQ: |
| sb.append(": mbr=" + mbr); |
| break; |
| |
| case JOIN_RSP: |
| case GET_VIEW_RSP: // GemStoneAddition - for becomeGroupCoordinator() |
| // sb.append(": join_rsp=" + join_rsp); |
| break; |
| |
| case LEAVE_REQ: |
| sb.append(": mbr=" + mbr); |
| break; |
| |
| case LEAVE_RSP: |
| // GemStoneAddition - forcedOut & arg |
| if (forcedOut) { |
| sb.append(": forced removal because ").append(this.arg); |
| } |
| break; |
| |
| case VIEW: |
| case VIEW_ACK: |
| // sb.append(": view=" + view); |
| break; |
| |
| case MERGE_REQ: |
| sb.append(": merge_id=" + merge_id); |
| break; |
| |
| case MERGE_RSP: |
| sb.append(/*": view=" + view + " digest=" + my_digest +*/ ", merge_rejected=" + merge_rejected + |
| ", merge_id=" + merge_id); |
| break; |
| |
| case INSTALL_MERGE_VIEW: |
| // sb.append(/*": view=" + view + ", digest=" + my_digest*/); |
| break; |
| |
| case CANCEL_MERGE: |
| sb.append(", <merge cancelled>, merge_id=" + merge_id); |
| break; |
| |
| case REMOVE_REQ: |
| sb.append(": mbr=" + mbr + ", arg=" + this.arg); // GemStoneAddition |
| break; |
| } |
| return sb.toString(); |
| } |
| |
| |
| public static String type2String(int type) { |
| switch(type) { |
| case JOIN_REQ: return "JOIN_REQ"; |
| case JOIN_RSP: return "JOIN_RSP"; |
| case LEAVE_REQ: return "LEAVE_REQ"; |
| case LEAVE_RSP: return "LEAVE_RSP"; |
| case VIEW: return "VIEW"; |
| case MERGE_REQ: return "MERGE_REQ"; |
| case MERGE_RSP: return "MERGE_RSP"; |
| case INSTALL_MERGE_VIEW: return "INSTALL_MERGE_VIEW"; |
| case CANCEL_MERGE: return "CANCEL_MERGE"; |
| case VIEW_ACK: return "VIEW_ACK"; |
| case REMOVE_REQ: return "REMOVE_REQ"; |
| case GET_VIEW: return "GET_VIEW"; |
| case GET_VIEW_RSP: return "GET_VIEW_RSP"; |
| case PREPARE_FOR_VIEW: return "PREPARE_FOR_VIEW"; |
| case PREPARE_FOR_VIEW_ACK: return "PREPARE_FOR_VIEW_ACK"; |
| case NETWORK_PARTITION_DETECTED: return "NETWORK_PARTITION_DETECTED"; |
| default: return "<unknown>"; |
| } |
| } |
| |
| |
| public void writeExternal(ObjectOutput out) throws IOException { |
| out.writeByte(type); |
| // out.writeObject(view); |
| out.writeObject(mbr); |
| // out.writeObject(join_rsp); |
| // out.writeObject(my_digest); |
| out.writeObject(merge_id); |
| out.writeBoolean(merge_rejected); |
| out.writeBoolean(this.forcedOut); // GemStoneAddition |
| out.writeObject(this.arg); // GemStoneAddition |
| } |
| |
| |
| public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| type=in.readByte(); |
| // view=(View)in.readObject(); |
| mbr=(Address)in.readObject(); |
| // join_rsp=(JoinRsp)in.readObject(); |
| // my_digest=(Digest)in.readObject(); |
| merge_id=(ViewId)in.readObject(); |
| merge_rejected=in.readBoolean(); |
| this.forcedOut = in.readBoolean(); // GemStoneAddition |
| this.arg = (String)in.readObject(); // GemStoneAddition |
| } |
| |
| |
| public void writeTo(DataOutputStream out) throws IOException { |
| out.writeByte(type); |
| // boolean isMergeView=view != null && view instanceof MergeView; |
| // out.writeBoolean(isMergeView); |
| // Util.writeStreamable(view, out); |
| Util.writeAddress(mbr, out); |
| // Util.writeStreamable(join_rsp, out); |
| // Util.writeStreamable(my_digest, out); |
| Util.writeStreamable(merge_id, out); // kludge: we know merge_id is a ViewId |
| out.writeBoolean(merge_rejected); |
| out.writeBoolean(this.forcedOut); // GemStoneAddition |
| if (this.arg == null) { // GemStoneAddition arg |
| out.writeBoolean(false); |
| } |
| else { |
| out.writeBoolean(true); |
| out.writeUTF(this.arg); // GemStoneAddition |
| } |
| } |
| |
| public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { |
| type=in.readByte(); |
| // boolean isMergeView=in.readBoolean(); |
| // if(isMergeView) |
| // view=(View)Util.readStreamable(MergeView.class, in); |
| // else |
| // view=(View)Util.readStreamable(View.class, in); |
| mbr=Util.readAddress(in); |
| // join_rsp=(JoinRsp)Util.readStreamable(JoinRsp.class, in); |
| // my_digest=(Digest)Util.readStreamable(Digest.class, in); |
| merge_id=(ViewId)Util.readStreamable(ViewId.class, in); |
| merge_rejected=in.readBoolean(); |
| this.forcedOut = in.readBoolean(); // GemStoneAddition |
| boolean hasArg = in.readBoolean(); // GemStoneAddition |
| if (hasArg) { |
| this.arg = in.readUTF(); // GemStoneAddition |
| } |
| } |
| |
| @Override // GemStoneAddition |
| public long size(short version) { |
| long retval=Global.BYTE_SIZE *2; // type + merge_rejected |
| |
| retval+=Global.BYTE_SIZE; // presence view |
| retval+=Global.BYTE_SIZE; // MergeView or View |
| // if(view != null) |
| // retval+=view.serializedSize(version); |
| |
| retval+=Util.size(mbr,version); |
| |
| // retval+=Global.BYTE_SIZE; // presence of join_rsp |
| // if(join_rsp != null) |
| // retval+=join_rsp.serializedSize(version); |
| |
| // retval+=Global.BYTE_SIZE; // presence for my_digest |
| // if(my_digest != null) |
| // retval+=my_digest.serializedSize(version); |
| |
| retval+=Global.BYTE_SIZE; // presence for merge_id |
| if(merge_id != null) |
| retval+=merge_id.serializedSize(version); |
| |
| if (this.arg != null) { |
| retval += this.arg.length(); // GemStoneAddition - arg string |
| } |
| retval += Global.BYTE_SIZE; // GemStoneAddition - forcedOut |
| return retval; |
| } |
| |
| } |
| |
| |
| |
| |
| public static class Request { |
| static final int JOIN = 1; |
| static final int LEAVE = 2; |
| static final int SUSPECT = 3; |
| static final int MERGE = 4; |
| static final int VIEW = 5; |
| |
| |
| int type=-1; |
| Address mbr=null; |
| boolean suspected; |
| Vector coordinators=null; |
| View view=null; |
| Digest digest=null; |
| List target_members=null; |
| String reason; // GemStoneAddition |
| |
| Request(int type) { |
| this.type=type; |
| } |
| |
| Request(int type, Address mbr, boolean suspected, Vector coordinators) { |
| this.type=type; |
| this.mbr=mbr; |
| this.suspected=suspected; |
| this.coordinators=coordinators; |
| } |
| |
| /** GemStoneAddition - carry the reason with a leave req */ |
| Request(int type, Address mbr, boolean suspected, Vector coordinators, String reason) { |
| this.type=type; |
| this.mbr=mbr; |
| this.suspected=suspected; |
| this.coordinators=coordinators; |
| this.reason = reason; |
| } |
| |
| @Override // GemStoneAddition |
| public String toString() { |
| switch(type) { |
| case JOIN: return "JOIN(" + mbr + ")"; |
| case LEAVE: return "LEAVE(" + mbr + ", " + suspected + ")"; |
| case SUSPECT: return "SUSPECT(" + mbr + ")"; |
| case MERGE: return "MERGE(" + coordinators + ")"; |
| case VIEW: return "VIEW (" + view.getVid() + ")"; |
| } |
| return "<invalid (type=" + type + ")"; |
| } |
| } |
| |
| |
| |
| |
| /** |
| * Class which processes JOIN, LEAVE and MERGE requests. Requests are queued and processed in FIFO order |
| * @author Bela Ban |
| * @version $Id: GMS.java,v 1.49 2005/12/23 14:57:06 belaban Exp $ |
| */ |
| class ViewHandler implements Runnable { |
| Thread viewThread; // GemStoneAddition -- accesses synchronized on this |
| com.gemstone.org.jgroups.util.Queue q=new com.gemstone.org.jgroups.util.Queue(); // Queue<Request> |
| boolean suspended=false; |
| final static long INTERVAL=5000; |
| private static final long MAX_COMPLETION_TIME=10000; |
| /** Maintains a list of the last 20 requests */ |
| private final BoundedList history=new BoundedList(20); |
| |
| /** Map<Object,TimeScheduler.CancellableTask>. Keeps track of Resumer tasks which have not fired yet */ |
| private final Map resume_tasks=new HashMap(); |
| private Object merge_id=null; |
| |
| |
| void add(Request req) { |
| add(req, false, false); |
| } |
| |
| synchronized void add(Request req, boolean at_head, boolean unsuspend) { |
| if(suspended && !unsuspend) { |
| log.warn("queue is suspended; request " + req + " is discarded"); |
| return; |
| } |
| start(unsuspend); |
| try { |
| if(at_head) |
| q.addAtHead(req); |
| else |
| q.add(req); |
| history.add(new Date() + ": " + req.toString()); |
| } |
| catch(QueueClosedException e) { |
| if(trace) |
| log.trace("queue is closed; request " + req + " is discarded"); |
| } |
| } |
| |
| |
| synchronized void waitUntilCompleted(long timeout) { |
| if(viewThread != null) { |
| try { |
| viewThread.join(timeout); |
| } |
| catch(InterruptedException e) { |
| Thread.currentThread().interrupt(); // GemStoneAddition |
| } |
| } |
| } |
| |
| /** |
| * Waits until the current request has been processes, then clears the queue and discards new |
| * requests from now on |
| */ |
| public synchronized void suspend(Object m_id) { |
| if(suspended) |
| return; |
| suspended=true; |
| this.merge_id=m_id; |
| q.clear(); |
| waitUntilCompleted(MAX_COMPLETION_TIME); |
| q.close(true); |
| if(trace) |
| log.trace("suspended ViewHandler"); |
| Resumer r=new Resumer(resume_task_timeout, m_id, resume_tasks, this); |
| resume_tasks.put(m_id, r); |
| timer.add(r); |
| } |
| |
| |
| public synchronized void resume(Object m_id) { |
| if(!suspended) |
| return; |
| boolean same_merge_id=this.merge_id != null && m_id != null && this.merge_id.equals(m_id); |
| same_merge_id=same_merge_id || (this.merge_id == null && m_id == null); |
| |
| if(!same_merge_id) { |
| if(warn) |
| log.warn("resume(" +m_id+ ") does not match " + this.merge_id + ", ignoring resume()"); |
| return; |
| } |
| synchronized(resume_tasks) { |
| TimeScheduler.CancellableTask task=(TimeScheduler.CancellableTask)resume_tasks.get(m_id); |
| if(task != null) { |
| task.cancel(); |
| resume_tasks.remove(m_id); |
| } |
| } |
| resumeForce(); |
| } |
| |
| public synchronized void resumeForce() { |
| if(q.closed()) |
| q.reset(); |
| suspended=false; |
| if(trace) |
| log.trace("resumed ViewHandler"); |
| } |
| |
| public void run() { |
| Request req; |
| for (;;) { // GemStoneAddition - remove coding anti-pattern |
| if (q.closed()) break; // GemStoneAddition |
| if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition |
| try { |
| req=(Request)q.remove(INTERVAL); // throws a TimeoutException if it runs into timeout |
| process(req); |
| if (Thread.currentThread().isInterrupted()) { // GemStoneAddition |
| return; |
| } |
| } |
| catch(QueueClosedException e) { |
| break; |
| } |
| catch(TimeoutException e) { |
| // if (trace) { |
| // log.trace("ViewHandler queue timeout - retrying"); |
| // } |
| continue; |
| //break; GemStoneAddition: fix for bug #42009. don't exit the view handler so hastily |
| } |
| } |
| if (trace) { |
| log.trace("ViewHandler is exiting"); |
| } |
| } |
| |
| public int size() { |
| return q.size(); |
| } |
| public boolean suspended() {return suspended;} |
| public String dumpQueue() { |
| StringBuffer sb=new StringBuffer(); |
| List v=q.values(); |
| for(Iterator it=v.iterator(); it.hasNext();) { |
| sb.append(it.next() + "\n"); |
| } |
| return sb.toString(); |
| } |
| |
| public String dumpHistory() { |
| StringBuffer sb=new StringBuffer(); |
| for(Enumeration en=history.elements(); en.hasMoreElements();) { |
| sb.append(en.nextElement() + "\n"); |
| } |
| return sb.toString(); |
| } |
| |
| private void process(Request reqp) { |
| //log.getLogWriter().info("gms viewhandler processing " + req); // debugging |
| List joinReqs = new ArrayList(); |
| List leaveReqs = new ArrayList(); |
| List suspectReqs = new ArrayList(); |
| List suspectReasons = new ArrayList(); |
| boolean loop = true; |
| long waittime = BUNDLE_WAITTIME; // time to wait for additional JOINs |
| long starttime = System.currentTimeMillis(); |
| Request req = reqp; |
| while (loop) { |
| if(trace) |
| log.trace("processing " + req); |
| switch(req.type) { |
| case Request.JOIN: { |
| if (!joinReqs.contains(req.mbr)) { // GemStoneAddition - duplicate check |
| joinReqs.add(req.mbr); |
| } |
| // impl.handleJoin(joinReqs); |
| break; |
| } |
| case Request.LEAVE: |
| if(req.suspected) { |
| // impl.suspect(req.mbr, req.reason); |
| if (!suspectReqs.contains(req.mbr)) { |
| suspectReqs.add(req.mbr); |
| suspectReasons.add(req.reason); |
| } |
| } |
| else { |
| if (!leaveReqs.contains(req.mbr)) { |
| leaveReqs.add(req.mbr); |
| } |
| } |
| break; |
| // case Request.SUSPECT: GemStoneAddition: this is no longer used |
| // impl.suspect(req.mbr); |
| // loop = false; |
| // break; |
| case Request.MERGE: |
| impl.merge(req.coordinators); |
| loop = false; |
| break; |
| case Request.VIEW: |
| boolean mcast = stack.gfPeerFunctions.getMcastPort() > 0; // GemStoneAddition |
| castViewChangeWithDest(req.view, req.digest, req.target_members, mcast); |
| loop = false; |
| break; |
| default: |
| log.error(ExternalStrings.GMS_REQUEST__0__IS_UNKNOWN_DISCARDED, req.type); |
| loop = false; |
| } |
| if (waittime <= 0) { |
| loop = false; |
| } |
| if (loop) { |
| boolean ignoreTimeout = false; |
| try { |
| Request addl = (Request)q.peek(waittime); |
| if (addl.type == Request.JOIN || addl.type == Request.LEAVE) { |
| try { |
| req = (Request)q.remove(); |
| ignoreTimeout = true; |
| } |
| catch (InterruptedException e) { |
| // Thread.interrupt(); |
| |
| // Disregard this. We are only interrupted at |
| // time of closing, at which point the queue |
| // has been closed and flushed. We process |
| // the remaining messages and then quit.... |
| } |
| } |
| } |
| catch (TimeoutException e) { |
| // no more events. Finish processing. |
| loop = ignoreTimeout; |
| } |
| catch (QueueClosedException e) { |
| // We've been stopped. |
| loop = ignoreTimeout; |
| } |
| waittime -= (System.currentTimeMillis() - starttime); |
| if (!ignoreTimeout && waittime <= 0) { |
| loop = false; |
| } |
| } // if (loop) |
| } // while (loop) |
| if (!joinReqs.isEmpty() || !leaveReqs.isEmpty() || !suspectReqs.isEmpty()) { |
| impl.handleJoinsAndLeaves(joinReqs, leaveReqs, suspectReqs, suspectReasons, false/*forceInclusion*/); |
| } |
| } |
| |
| synchronized void start(boolean unsuspend) { |
| if(q.closed()) |
| q.reset(); |
| if(unsuspend) { |
| suspended=false; |
| synchronized(resume_tasks) { |
| TimeScheduler.CancellableTask task=(TimeScheduler.CancellableTask)resume_tasks.get(merge_id); |
| if(task != null) { |
| task.cancel(); |
| resume_tasks.remove(merge_id); |
| } |
| } |
| } |
| merge_id=null; |
| if(viewThread == null || !viewThread.isAlive()) { |
| viewThread=new Thread(GemFireTracer.GROUP, this, "ViewHandler"); |
| viewThread.setDaemon(true); |
| viewThread.start(); |
| if(trace) |
| log.trace("ViewHandler started"); |
| } |
| } |
| |
| synchronized void stop(boolean flush) { |
| q.close(flush); |
| if (viewThread != null && viewThread.isAlive()) viewThread.interrupt(); // GemStoneAddition |
| viewThread = null; // GemStoneAddition |
| TimeScheduler.CancellableTask task; |
| synchronized(resume_tasks) { |
| for(Iterator it=resume_tasks.values().iterator(); it.hasNext();) { |
| task=(TimeScheduler.CancellableTask)it.next(); |
| task.cancel(); |
| } |
| resume_tasks.clear(); |
| } |
| merge_id=null; |
| resumeForce(); |
| } |
| } |
| |
| |
| /** |
| * Resumer is a second line of defense: when the ViewHandler is suspended, it will be resumed when the current |
| * merge is cancelled, or when the merge completes. However, in a case where this never happens (this |
| * shouldn't be the case !), the Resumer will nevertheless resume the ViewHandler. |
| * We chose this strategy because ViewHandler is critical: if it is suspended indefinitely, we would |
| * not be able to process new JOIN requests ! So, this is for peace of mind, although it most likely |
| * will never be used... |
| */ |
| static class Resumer implements TimeScheduler.CancellableTask { |
| boolean cancelled=false; |
| long interval; |
| final Object token; |
| final Map tasks; |
| final ViewHandler handler; |
| |
| |
| public Resumer(long interval, final Object token, final Map t, final ViewHandler handler) { |
| this.interval=interval; |
| this.token=token; |
| this.tasks=t; |
| this.handler=handler; |
| } |
| |
| public void cancel() { |
| cancelled=true; |
| } |
| |
| public boolean cancelled() { |
| return cancelled; |
| } |
| |
| public long nextInterval() { |
| return interval; |
| } |
| |
| public void run() { |
| TimeScheduler.CancellableTask t; |
| boolean execute=true; |
| synchronized(tasks) { |
| t=(TimeScheduler.CancellableTask)tasks.get(token); |
| if(t != null) { |
| t.cancel(); |
| execute=true; |
| } |
| else { |
| execute=false; |
| } |
| tasks.remove(token); |
| } |
| if(execute) { |
| handler.resume(token); |
| } |
| } |
| } |
| |
| |
| |
| |
| } |