blob: 9c2d9fdf8cf09860cd28b4067249a133d921ba29 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.distributed.internal.membership.jgroup;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.NotSerializableException;
import java.net.DatagramSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.ForcedDisconnectException;
import com.gemstone.gemfire.GemFireConfigException;
import com.gemstone.gemfire.InternalGemFireError;
import com.gemstone.gemfire.SystemConnectException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.ToDataException;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache.util.BoundedLinkedHashMap;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.Locator;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DMStats;
import com.gemstone.gemfire.distributed.internal.DSClock;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.DistributionException;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.DistributionStats;
import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.InternalLocator;
import com.gemstone.gemfire.distributed.internal.SizeableRunnable;
import com.gemstone.gemfire.distributed.internal.StartupMessage;
import com.gemstone.gemfire.distributed.internal.ThrottlingMemLinkedQueueWithDMStats;
import com.gemstone.gemfire.distributed.internal.direct.DirectChannel;
import com.gemstone.gemfire.distributed.internal.membership.DistributedMembershipListener;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
import com.gemstone.gemfire.distributed.internal.membership.MembershipTestHook;
import com.gemstone.gemfire.distributed.internal.membership.NetView;
import com.gemstone.gemfire.distributed.internal.membership.QuorumChecker;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.ClassPathLoader;
import com.gemstone.gemfire.internal.HeapDataOutputStream;
import com.gemstone.gemfire.internal.SocketCreator;
import com.gemstone.gemfire.internal.SystemTimer;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
import com.gemstone.gemfire.internal.cache.DirectReplyMessage;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.xmlcache.BridgeServerCreation;
import com.gemstone.gemfire.internal.cache.xmlcache.CacheXmlGenerator;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.shared.StringPrintWriter;
import com.gemstone.gemfire.internal.tcp.ConnectExceptions;
import com.gemstone.gemfire.internal.tcp.ConnectionException;
import com.gemstone.gemfire.internal.tcp.MemberShunnedException;
import com.gemstone.gemfire.internal.tcp.Stub;
import com.gemstone.gemfire.internal.tcp.TCPConduit;
import com.gemstone.gemfire.internal.util.Breadcrumbs;
import com.gemstone.org.jgroups.Address;
import com.gemstone.org.jgroups.Channel;
import com.gemstone.org.jgroups.ChannelClosedException;
import com.gemstone.org.jgroups.ChannelNotConnectedException;
import com.gemstone.org.jgroups.JChannel;
import com.gemstone.org.jgroups.Message;
import com.gemstone.org.jgroups.Receiver;
import com.gemstone.org.jgroups.ShunnedAddressException;
import com.gemstone.org.jgroups.SuspectMember;
import com.gemstone.org.jgroups.View;
import com.gemstone.org.jgroups.debug.JChannelTestHook;
import com.gemstone.org.jgroups.oswego.concurrent.Latch;
import com.gemstone.org.jgroups.protocols.FD;
import com.gemstone.org.jgroups.protocols.FD_SOCK;
import com.gemstone.org.jgroups.protocols.TCP;
import com.gemstone.org.jgroups.protocols.TP;
import com.gemstone.org.jgroups.protocols.UDP;
import com.gemstone.org.jgroups.protocols.VERIFY_SUSPECT;
import com.gemstone.org.jgroups.protocols.pbcast.GMS;
import com.gemstone.org.jgroups.protocols.pbcast.NAKACK;
import com.gemstone.org.jgroups.spi.GFBasicAdapter;
import com.gemstone.org.jgroups.spi.GFPeerAdapter;
import com.gemstone.org.jgroups.stack.GossipServer;
import com.gemstone.org.jgroups.stack.IpAddress;
import com.gemstone.org.jgroups.stack.ProtocolStack;
import com.gemstone.org.jgroups.util.GemFireTracer;
public class JGroupMembershipManager implements MembershipManager
{
private static final Logger logger = LogService.getLogger();
/** product version to use for multicast serialization */
volatile boolean disableMulticastForRollingUpgrade;
/**
* set to true if the distributed system that created this manager was
* auto-reconnecting when it was created.
*/
boolean wasReconnectingSystem;
/**
* A quorum checker is created during reconnect and is held
* here so it is available to the UDP protocol for passing off
* the ping-pong responses used in the quorum-checking algorithm.
*/
private volatile QuorumCheckerImpl quorumChecker;
/**
* during an auto-reconnect attempt set this to the old DistributedSystem's
* UDP membership port socket. The UDP protocol will pick it up and use it.
*/
private volatile DatagramSocket oldDSMembershipSocket;
/**
* thread-local used to force use of JGroups for communications, usually to
* avoid deadlock when conserve-sockets=true. Use of this should be removed
* when connection pools are implemented in the direct-channel
*/
private ThreadLocal<Boolean> forceUseJGroups = new ThreadLocal<Boolean>();
/**
* A general use timer
*/
private Timer timer = new Timer("Membership Timer", true);
static {
// this system property has been available since GemFire 2.0 to turn
// tracing on in JGroups (formerly known as JavaGroups)
boolean b = Boolean.getBoolean("DistributionManager.DEBUG_JAVAGROUPS");
if ( b ) {
GemFireTracer.DEBUG = true;
}
}
/**
* Trick class to make the startup synch more
* visible in stack traces
*
* @see JGroupMembershipManager#startupLock
* @author jpenney
*
*/
static class EventProcessingLock {
public EventProcessingLock() {
}
}
/**
* Trick class to make the view lock more visible
* in stack traces
*
* @author jpenney
*
*/
static class ViewLock {
public ViewLock() {
}
}
static class StartupEvent {
static final int DEPARTURE = 1;
static final int CONNECT = 2;
static final int VIEW = 3;
static final int MESSAGE = 4;
/**
* indicates whether the event is a departure, a surprise connect
* (i.e., before the view message arrived), a view, or a regular
* message
*
* @see #DEPARTURE
* @see #CONNECT
* @see #VIEW
* @see #MESSAGE
*/
private int kind;
// Miscellaneous state depending on the kind of event
InternalDistributedMember member;
boolean crashed;
String reason;
DistributionMessage dmsg;
Stub stub;
View jgView;
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("kind=");
switch (kind) {
case DEPARTURE:
sb.append("departure; member = <")
.append(member)
.append(">; crashed = ")
.append(crashed)
.append("; reason = ");
if (reason != null && (reason.indexOf("NoSuchMemberException") >= 0)) {
sb.append(LocalizedStrings.JGroupMembershipManager_TCPIP_CONNECTIONS_CLOSED.toLocalizedString());
}
else {
sb.append(reason);
}
break;
case CONNECT:
sb.append("connect; member = <" + member + ">; stub = " + stub);
break;
case VIEW:
String text = DistributionManager.printView(
viewToMemberView(jgView));
sb.append("view <" + text + ">");
break;
case MESSAGE:
sb.append("message <" + dmsg + ">");
break;
default:
sb.append("unknown=<" + kind + ">");
break;
}
return sb.toString();
}
/**
* Create a departure event
* @param dm the member that left
* @param crashed true if this member crashed
* @param reason reason string, esp. if crashed
*/
StartupEvent(InternalDistributedMember dm, boolean crashed, String reason) {
this.kind = DEPARTURE;
this.member = dm;
this.crashed = crashed;
this.reason = reason;
}
/**
* Indicate if this is a departure
* @return true if this is a departure event
*/
boolean isDepartureEvent() {
return this.kind == DEPARTURE;
}
/**
* Create a connect event
* @param member the member connecting
* @param id the stub
*/
StartupEvent(final InternalDistributedMember member, final Stub id) {
this.kind = CONNECT;
this.member = member;
this.stub = id;
}
/**
* Indicate if this is a connect event
* @return true if this is a connect event
*/
boolean isConnect() {
return this.kind == CONNECT;
}
/**
* Create a view event
* @param v the new view
*/
StartupEvent(View v) {
this.kind = VIEW;
this.jgView = v;
}
/**
* Indicate if this is a view event
* @return true if this is a view event
*/
boolean isJgView() {
return this.kind == VIEW;
}
/**
* Create a message event
* @param d the message
*/
StartupEvent(DistributionMessage d) {
this.kind = MESSAGE;
this.dmsg = d;
}
/**
* Indicate if this is a message event
* @return true if this is a message event
*/
boolean isDistributionMessage() {
return this.kind == MESSAGE;
}
}
/**
* The system property that specifies the name of a file from which to read
* Jgroups configuration information
*/
public static final String JAVAGROUPS_CONFIG = System
.getProperty("DistributionManager.JAVAGROUPS_CONFIG");
/**
* The location (in the product) of the locator Jgroups config file.
*/
private static final String DEFAULT_JAVAGROUPS_TCP_CONFIG = "com/gemstone/gemfire/distributed/internal/javagroups-config.txt";
/**
* The location (in the product) of the mcast Jgroups config file.
*/
private static final String DEFAULT_JAVAGROUPS_MCAST_CONFIG = "com/gemstone/gemfire/distributed/internal/javagroups-mcast.txt";
/** is multicast being used for group discovery, or locators? */
private static boolean isMcastDiscovery = false;
/** is use of multicast enabled at all? */
private static boolean isMcastEnabled = false;
/** Sometimes the jgroups channel is blocked until unicast messages
are all ack'd. This boolean disables that blocking and can
increase performance in situations where it's okay to deliver
point-to-point and multicast messages out of order
*/
public final static boolean DISABLE_UCAST_FLUSH = !Boolean.getBoolean("p2p.enableUcastFlush");
private int membershipCheckTimeout = DistributionConfig.DEFAULT_SECURITY_PEER_VERIFYMEMBER_TIMEOUT;
// /** FOR TESTING ONLY:
// This variable forces most distributed cache operations to be
// multicast if multicast is available. See DistributionConfigImpl's
// multicastTest variable for forcing availability of multicast. */
// public final static boolean multicastTest = false;
/**
* This is the underlying Jgroups channel
*/
protected volatile JChannel channel;
/**
* This object synchronizes threads waiting for
* startup to finish. Updates to {@link #startupMessages}
* are synchronized through this object.
*/
protected final EventProcessingLock startupLock = new EventProcessingLock();
/**
* This is the latest view (ordered list of DistributedMembers)
* that has been returned from Jgroups
*
* All accesses to this object are protected via {@link #latestViewLock}
*/
protected NetView latestView = new NetView();
/**
* This is the lock for protecting access to latestView
*
* @see #latestView
*/
protected ViewLock latestViewLock = new ViewLock();
/**
* This is the listener that accepts our membership events
*/
protected com.gemstone.gemfire.distributed.internal.membership.DistributedMembershipListener listener;
/**
* Membership failure listeners - for testing
*/
List membershipTestHooks;
/**
* Channel test hook
*/
protected JChannelTestHook channelTestHook;
/**
* This is a representation of the local member (ourself)
*/
protected InternalDistributedMember myMemberId = null; // new DistributedMember(-1);
protected DirectChannel directChannel;
protected TCPConduit conduit;
protected MyDCReceiver dcReceiver;
/** the configuration for the distributed system */
private DistributionConfig dconfig;
volatile boolean isJoining;
/** has the jgroups channel been connected successfully? */
volatile boolean hasConnected;
/**
* a map keyed on InternalDistributedMember, values are Stubs that represent direct
* channels to other systems
*
* Accesses must be synchronized via {@link #latestViewLock}.
*/
protected final Map memberToStubMap = new ConcurrentHashMap();
/**
* a map of direct channels (Stub) to InternalDistributedMember. key instanceof Stub
* value instanceof InternalDistributedMember
*
* Accesses must be synchronized via {@link #latestViewLock}.
*/
protected final Map stubToMemberMap = new ConcurrentHashMap();
/**
* a map of jgroups addresses (IpAddress) to InternalDistributedMember that
* is used to avoid creating new IDMs for every jgroups message when the
* direct-channel is disabled
*/
private final Map ipAddrToMemberMap = new ConcurrentHashMap();
/**
* Members of the distributed system that we believe have shut down.
* Keys are instances of {@link InternalDistributedMember}, values are
* Longs indicating the time this member was shunned.
*
* Members are removed after {@link #SHUNNED_SUNSET} seconds have
* passed.
*
* Accesses to this list needs to be synchronized via {@link #latestViewLock}
*
* @see System#currentTimeMillis()
*/
// protected final Set shunnedMembers = Collections.synchronizedSet(new HashSet());
protected final Map shunnedMembers = new ConcurrentHashMap();
/**
* Members that have sent a shutdown message. This is used to suppress
* suspect processing in JGroups that otherwise becomes pretty aggressive
* when a member is shutting down.
*/
private final Map shutdownMembers = new BoundedLinkedHashMap(1000);
/**
* per bug 39552, keep a list of members that have been shunned and for
* which a message is printed. Contents of this list are cleared at the
* same time they are removed from {@link #shunnedMembers}.
*
* Accesses to this list needs to be synchronized via {@link #latestViewLock}
*/
protected final HashSet shunnedAndWarnedMembers = new HashSet();
/**
* The identities and birth-times of others that we have allowed into
* membership at the distributed system level, but have not yet appeared
* in a jgroups view.
* <p>
* Keys are instances of {@link InternalDistributedMember}, values are
* Longs indicating the time this member was shunned.
* <p>
* Members are removed when a view containing them is processed. If,
* after {@link #surpriseMemberTimeout} milliseconds have passed, a view
* containing the member has not arrived, the member is removed from
* membership and member-left notification is performed.
* <p>>
* Accesses to this list needs to be synchronized via {@link #latestViewLock}
*
* @see System#currentTimeMillis()
*/
protected final Map<InternalDistributedMember, Long> surpriseMembers = new ConcurrentHashMap();
/**
* the timeout interval for surprise members. This is calculated from
* the member-timeout setting
*/
protected int surpriseMemberTimeout;
/**
* javagroups can skip views and omit telling us about a crashed member.
* This map holds a history of suspected members that we use to detect
* crashes.
*/
private final Map<InternalDistributedMember, Long> suspectedMembers = new ConcurrentHashMap();
/**
* the timeout interval for suspected members
*/
private final long suspectMemberTimeout = 180000;
/** sleep period, in millis, that the user of this manager should slumber after creating
the manager. This is advice from the JChannel itself when it detects a concurrent
startup race condition that requires a settling period. */
private long channelPause = 0;
/** whether time-based statistics should be gathered */
// private boolean enableClockStats = DistributionStats.enableClockStats;
/** whether time-based statistics should be gathered for up/down events in jgroup stacks */
private boolean enableJgStackStats = Boolean.getBoolean("p2p.enableJgStackStats");
/**
* Length of time, in seconds, that a member is retained in the zombie set
*
* @see #shunnedMembers
*/
static private final int SHUNNED_SUNSET = Integer.getInteger(
"gemfire.shunned-member-timeout", 300).intValue();
/** has the jchannel been initialized? */
protected volatile boolean channelInitialized = false;
/**
* Set to true when the service should stop.
*/
protected volatile boolean shutdownInProgress = false;
/**
* Set to true when upcalls should be generated for
* events.
*/
protected volatile boolean processingEvents = false;
/**
* This is the latest viewId received from JGroups
*/
long lastViewId = -1;
/**
* the jgroups channel's UNICAST protocol object (used for flushing)
*/
com.gemstone.org.jgroups.protocols.UNICAST ucastProtocol;
/**
* the jgroups channel's VERIFY_SUSPECT protocol object
*/
com.gemstone.org.jgroups.protocols.VERIFY_SUSPECT verifySuspectProtocol;
/**
* if using UDP-based failure detection, this holds the FD protocol
*/
com.gemstone.org.jgroups.protocols.FD fdProtocol;
/**
* The underlying UDP protocol, if it exists
*/
UDP udpProtocol;
/**
* The underlying TCP protocol, if it exists
*/
TCP tcpProtocol;
/**
* The underlying NAKACK protocol, if it exists
*/
NAKACK nakAckProtocol;
/**
* the jgroups channel's FD_SOCK protocol object (for system failure)
*/
com.gemstone.org.jgroups.protocols.FD_SOCK fdSockProtocol;
/** an object used in the suspension of message transmission */
private Object sendSuspendMutex = new Object();
/** a flag stating that outgoing message transmission is suspended */
private boolean sendSuspended = false;
/** distribution manager statistics */
DMStats stats;
/**
A list of messages received during channel startup that couldn't be processed yet.
Additions or removals of this list must be synchronized
via {@link #startupLock}.
@since 5.0
*/
protected LinkedList startupMessages = new LinkedList();
/** the type of vm we're running in. This is also in the membership id,
but is needed by some methods before the membership id has been
created. */
int vmKind;
/**
* ARB: the map of latches is used to block peer handshakes till
* authentication is confirmed.
*/
final private HashMap memberLatch = new HashMap();
/**
* Insert our own MessageReceiver between us and the direct channel, in order
* to correctly filter membership events.
*
* TODO: DistributionManager shouldn't have to implement this interface.
*
* @author jpenney
*
*/
class MyDCReceiver implements DistributedMembershipListener
{
DistributedMembershipListener upCall;
/**
* Don't provide events until the caller has told us we are ready.
*
* Synchronization provided via JGroupMembershipManager.class.
*
* Note that in practice we only need to delay accepting the first
* client; we don't need to put this check before every call...
*
*/
MyDCReceiver(DistributedMembershipListener up) {
upCall = up;
}
public void messageReceived(DistributionMessage msg)
{
// bug 36851 - notify failure detection that we've had contact from a member
IpAddress addr = ((JGroupMember)msg.getSender().getNetMember()).getAddress();
if (fdProtocol != null) {
fdProtocol.messageReceivedFrom(addr);
}
if (verifySuspectProtocol != null) {
if (addr.getBirthViewId() < 0) {
if (logger.isDebugEnabled()) {
// if there is no view ID then this is not a valid address
logger.debug("Membership: invalid address found in sender of {}", msg);
}
} else {
verifySuspectProtocol.unsuspect(addr);
}
}
handleOrDeferMessage(msg);
}
public void newMemberConnected(final InternalDistributedMember member, final Stub id)
{
handleOrDeferSurpriseConnect(member, id);
}
public void memberDeparted(InternalDistributedMember id, boolean crashed, String reason)
{
try {
handleOrDeferRemove(id, crashed, reason);
}
catch (DistributedSystemDisconnectedException ignore) {
// ignore
}
catch (RuntimeException e) {
logger.warn(LocalizedMessage.create(LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_ERROR_HANDLING_MEMBER_DEPARTURE__0), e);
}
}
public void quorumLost(Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {
}
public void memberSuspect(InternalDistributedMember suspect, InternalDistributedMember whoSuspected) {
// the direct channel isn't currently a source of suspect events, though
// it does request initiation of suspicion through the membership
// manager
}
public boolean isShutdownMsgSent()
{
return upCall.isShutdownMsgSent();
}
public void membershipFailure(String reason, Throwable t)
{
upCall.membershipFailure(reason, t);
}
public void viewInstalled(NetView view) {
upCall.viewInstalled(view);
}
public DistributionManager getDM()
{
return upCall.getDM();
}
}
// /**
// * Perform major validation on a NetView
// *
// * @param v
// */
// private void validateView(Vector members)
// {
// for (int i = 0; i < members.size(); i++) {
// InternalDistributedMember dm = (InternalDistributedMember)members.elementAt(i);
// if ((!NO_DIRECT_CHANNEL && dm.getDirectChannelPort() <= 0)
// || dm.getVmKind() <= 0 || dm.getPort() <= 0) {
// members.remove(i);
// i--; // restart scan at new element at this point
// }
// }
// }
/** turn javagroups debugging on or off */
public static void setDebugJGroups(boolean flag) {
com.gemstone.org.jgroups.util.GemFireTracer.DEBUG=flag;
com.gemstone.org.jgroups.stack.Protocol.trace=flag;
}
/** if we connect to a locator that has NPD enabled then we enable it in this VM */
public void enableNetworkPartitionDetection() {
if (logger.isDebugEnabled()) {
logger.debug("Network partition detection is being enabled");
}
this.dconfig.setEnableNetworkPartitionDetection(true);
}
/**
* Analyze a given view object, generate events as appropriate
*
* @param newView
*/
protected void processView(long newViewId, NetView newView)
{
// Sanity check...
if (logger.isInfoEnabled(LogMarker.DM_VIEWS)) {
StringBuffer msg = new StringBuffer(200);
msg.append("Membership: Processing view (id = ");
msg.append(newViewId).append(") {");
msg.append(DistributionManager.printView(newView));
if (newView.getCrashedMembers().size() > 0) {
msg.append(" with unexpected departures [");
for (Iterator it=newView.getCrashedMembers().iterator(); it.hasNext(); ) {
msg.append(it.next());
if (it.hasNext()) {
msg.append(',');
}
}
msg.append(']');
}
msg.append("} on " + myMemberId.toString());
if (logger.isDebugEnabled()) {
logger.trace(LogMarker.DM_VIEWS, msg);
}
if (!newView.contains(myMemberId))
logger.info(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_THE_MEMBER_WITH_ID_0_IS_NO_LONGER_IN_MY_OWN_VIEW_1,
new Object[] {myMemberId, newView}));
}
// if (newView.getCrashedMembers().size() > 0) {
// // dump stack for debugging #39827
// OSProcess.printStacks(0);
// }
// We perform the update under a global lock so that other
// incoming events will not be lost in terms of our global view.
synchronized (latestViewLock) {
// first determine the version for multicast message serialization
Version version = Version.CURRENT;
for (Iterator<Map.Entry<InternalDistributedMember, Long>> it=surpriseMembers.entrySet().iterator(); it.hasNext(); ) {
InternalDistributedMember mbr = it.next().getKey();
Version itsVersion = mbr.getVersionObject();
if (itsVersion != null && version.compareTo(itsVersion) < 0) {
version = itsVersion;
}
}
for (InternalDistributedMember mbr: newView) {
Version itsVersion = mbr.getVersionObject();
if (itsVersion != null && itsVersion.compareTo(version) < 0) {
version = mbr.getVersionObject();
}
}
disableMulticastForRollingUpgrade = !version.equals(Version.CURRENT);
if (newViewId < lastViewId) {
// ignore this view since it is old news
if (newViewId < lastViewId && logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
logger.trace(LogMarker.DISTRIBUTION_VIEWS, "Membership: Ignoring view (with id {}) since it is older than the last view (with id {}); ignoredView={}",
newViewId, lastViewId, DistributionManager.printView(newView));
}
return;
}
// Save previous view, for delta analysis
Vector priorView = latestView;
// update the view to reflect our changes, so that
// callbacks will see the new (updated) view.
lastViewId = newViewId;
latestView = newView;
// look for additions
Set warnShuns = new HashSet();
for (int i = 0; i < newView.size(); i++) { // additions
InternalDistributedMember m = (InternalDistributedMember)newView.elementAt(i);
// Once a member has been seen via JGroups, remove them from the
// newborn set
boolean wasSurprise = surpriseMembers.remove(m) != null;
// bug #45155 - membership view processing was slow, causing a member to connect as "surprise"
// and the surprise timeout removed the member and shunned it, keeping it from being
// recognized as a valid member when it was finally seen in a view
// if (isShunned(m)) {
// warnShuns.add(m);
// continue;
// }
// if it's in a view, it's no longer suspect
suspectedMembers.remove(m);
if (priorView.contains(m) || wasSurprise) {
continue; // already seen
}
// ARB: unblock any waiters for this particular member.
// i.e. signal any waiting threads in tcpconduit.
String authInit = this.dconfig.getSecurityPeerAuthInit();
boolean isSecure = authInit != null && authInit.length() != 0;
if (isSecure) {
Latch currentLatch;
if ((currentLatch = (Latch)memberLatch.get(m)) != null) {
currentLatch.release();
}
}
// fix for bug #42006, lingering old identity
Object oldStub = this.memberToStubMap.remove(m);
if (oldStub != null) {
this.stubToMemberMap.remove(oldStub);
}
if (shutdownInProgress()) {
logger.info(LogMarker.DM_VIEWS, LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_SHUNNING_MEMBER__0__DURING_OUR_SHUTDOWN, m));
addShunnedMember(m);
continue; // no additions processed after shutdown begins
} else {
boolean wasShunned = endShun(m); // bug #45158 - no longer shun a process that is now in view
if (wasShunned && logger.isTraceEnabled(LogMarker.DM_VIEWS)) {
logger.debug("No longer shunning {} as it is in the current membership view", m);
}
}
logger.info(LocalizedMessage.create(LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_PROCESSING_ADDITION__0_, m));
try {
listener.newMemberConnected(m, getStubForMember(m));
}
catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch (DistributedSystemDisconnectedException e) {
// don't log shutdown exceptions
}
catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.info(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_FAULT_WHILE_PROCESSING_VIEW_ADDITION_OF__0, m), t);
}
} // additions
// now issue warnings outside of the synchronization
for (Iterator it=warnShuns.iterator(); it.hasNext(); ) {
warnShun((DistributedMember)it.next());
}
// look for departures
for (int i = 0; i < priorView.size(); i++) { // departures
InternalDistributedMember m = (InternalDistributedMember)priorView.elementAt(i);
if (newView.contains(m)) {
continue; // still alive
}
if (surpriseMembers.containsKey(m)) {
continue; // member has not yet appeared in JGroups view
}
try {
logger.info(LogMarker.DM_VIEWS, LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_PROCESSING_DEPARTING_MEMBER__0_, m));
removeWithViewLock(m,
newView.getCrashedMembers().contains(m) || suspectedMembers.containsKey(m)
, "departed JGroups view");
}
catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.info(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_FAULT_WHILE_PROCESSING_VIEW_REMOVAL_OF__0, m), t);
}
} // departures
// expire surprise members, add others to view
long oldestAllowed = System.currentTimeMillis() - this.surpriseMemberTimeout;
for (Iterator<Map.Entry<InternalDistributedMember, Long>> it=surpriseMembers.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<InternalDistributedMember, Long> entry = it.next();
Long birthtime = (Long)entry.getValue();
if (birthtime.longValue() < oldestAllowed) {
it.remove();
InternalDistributedMember m = entry.getKey();
logger.info(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_EXPIRING_MEMBERSHIP_OF_SURPRISE_MEMBER_0, m));
removeWithViewLock(m, true, "not seen in membership view in "
+ this.surpriseMemberTimeout + "ms");
}
else {
if (!latestView.contains(entry.getKey())) {
latestView.add(entry.getKey());
}
}
}
// expire suspected members
oldestAllowed = System.currentTimeMillis() - this.suspectMemberTimeout;
for (Iterator it=suspectedMembers.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry)it.next();
Long birthtime = (Long)entry.getValue();
if (birthtime.longValue() < oldestAllowed) {
InternalDistributedMember m = (InternalDistributedMember)entry.getKey();
it.remove();
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
logger.trace(LogMarker.DISTRIBUTION_VIEWS, "Membership: expiring suspect member <{}>", m);
}
}
}
try {
listener.viewInstalled(latestView);
startCleanupTimer();
}
catch (DistributedSystemDisconnectedException se) {
}
} // synchronized
logger.info(LogMarker.DM_VIEWS, LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_FINISHED_VIEW_PROCESSING_VIEWID___0, Long.valueOf(newViewId)));
}
/**
* Convert from a Jgroups View to a GemFire Netview
*
* @param v
* @return the NetView
*/
static protected NetView viewToMemberView(View v)
{
Vector vmem = v.getMembers();
NetView members = new NetView(v.getMembers().size(), v.getVid().getId());
Address leadmember = v.getLeadMember();
Address creator = v.getCreator();
for (int i = 0; i < vmem.size(); i++) {
IpAddress a = (IpAddress)vmem.get(i);
JGroupMember m = new JGroupMember(a);
InternalDistributedMember d = new InternalDistributedMember(m);
members.add(d);
if (leadmember != null && leadmember.equals(a)) {
members.setLeadMember(m);
}
if (creator != null && creator.equals(a)) {
members.setCreator(m);
}
}
// if there's a suspect set, process it and set it as the crashed set
Set smem = v.getSuspectedMembers();
if (smem != null) {
Set snmem = new HashSet(smem.size());
for (Iterator it=smem.iterator(); it.hasNext(); ) {
IpAddress a = (IpAddress)it.next();
JGroupMember m = new JGroupMember(a);
InternalDistributedMember d = new InternalDistributedMember(m);
snmem.add(d);
}
members.setCrashedMembers(snmem);
}
return members;
}
/**
* A worker thread that reads incoming messages and adds them to the incoming
* queue.
*/
class Puller implements Receiver
{
/* ------------------- JGroups Receiver -------------------------- */
/**
* Called when a message is received.
* @param jgmsg
*/
public void receive(Message jgmsg) {
if (shutdownInProgress())
return;
Object o = null;
int messageLength = jgmsg.getLength();
if (messageLength == 0) {
// jgroups messages with no payload are used for protocol interchange, such
// as STABLE_GOSSIP
return;
}
// record the time spent getting from the socket to this method.
// see com.gemstone.org.jgroups.protocols.TP
if (DistributionStats.enableClockStats && jgmsg.timeStamp > 0) {
stats.incMessageChannelTime(DistributionStats.getStatTime()-jgmsg.timeStamp);
}
// admin-only VMs don't have caches, so we ignore cache operations
// multicast to them, avoiding deserialization cost and classpath
// problems
if ( (vmKind == DistributionManager.ADMIN_ONLY_DM_TYPE)
&& jgmsg.getIsDistributedCacheOperation()) {
if (logger.isTraceEnabled(LogMarker.DM))
logger.trace(LogMarker.DM, "Membership: admin VM discarding cache operation message {}", jgmsg.getObject());
return;
}
try {
o = jgmsg.getObject();
}
catch (IllegalArgumentException e) {
if (e.getCause() != null && e.getCause() instanceof java.io.IOException) {
logger.error(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_EXCEPTION_DESERIALIZING_MESSAGE_PAYLOAD_0, jgmsg), e.getCause());
return;
}
else {
throw e;
}
}
if (o == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_GEMFIRE_RECEIVED_NULL_MESSAGE_FROM__0, String.valueOf(jgmsg)));
logger.warn(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_MESSAGE_HEADERS__0, jgmsg.printObjectHeaders()));
return;
}
DistributionMessage msg = (DistributionMessage)o;
msg.resetTimestamp();
msg.setBytesRead(messageLength);
IpAddress sender = (IpAddress)jgmsg.getSrc();
if (sender == null) {
Exception e = new Exception(LocalizedStrings.JGroupMembershipManager_NULL_SENDER.toLocalizedString());
logger.warn(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_GEMFIRE_RECEIVED_A_MESSAGE_WITH_NO_SENDER_ADDRESS), e);
}
InternalDistributedMember dm;
if (isConnected()) {
dm = getMemberFromIpAddress(sender, true);
} else {
// if the channel isn't initialized getMemberFromIpAddress will block the
// UDP receive channel, eventually causing a SystemConnectException
JGroupMember jgm = new JGroupMember(sender);
dm = new InternalDistributedMember(jgm);
}
msg.setSender(dm);
try {
handleOrDeferMessage(msg);
}
catch (MemberShunnedException e) {
// message from non-member - ignore
}
}
/**
* Answers the group state; e.g., when joining.
* @return byte[]
*/
public byte[] getState() {
return new byte[0];
}
/**
* Sets the group state; e.g., when joining.
* @param state
*/
public void setState(byte[] state) {
}
/**
* Called when a change in membership has occurred.
* <b>No long running actions should be done in this callback.</b>
* If some long running action needs to be performed, it should be done in a separate thread.
*/
public void viewAccepted(View viewArg) {
handleOrDeferViewEvent(viewArg);
}
/**
* Called whenever a member is suspected of having crashed,
* but has not yet been excluded.
*/
public void suspect(SuspectMember suspectInfo) {
if (!shutdownInProgress()) {
handleOrDeferSuspect(suspectInfo);
}
}
/**
* Called whenever a member has passed suspect processing during suspect
* processing
*/
/**
* Called whenever the member needs to stop sending messages.
* When the next view is received (viewAccepted()), the member can resume sending
* messages. If a member does not comply, the message(s) sent between a block()
* and a matching viewAccepted() callback will probably be delivered in the next view.
* The block() callback is only needed by the Virtual Synchrony suite of protocols
* (FLUSH protocol)3.2, otherwise it will never be invoked.
*/
public void block() {
}
public void channelClosing(Channel c, final Exception e) {
if (e instanceof ShunnedAddressException) {
return; // channel creation will retry
}
if (JGroupMembershipManager.this.shutdownInProgress) {
return; // probably a jgroups race condition
}
if (!dconfig.getDisableAutoReconnect()) {
saveCacheXmlForReconnect(dconfig.getUseSharedConfiguration());
}
// make sure that we've received a connected channel and aren't responsible
// for the notification
if (JGroupMembershipManager.this.puller != null // not shutting down
&& JGroupMembershipManager.this.channel != null) { // finished connecting
// cache the exception so it can be appended to ShutdownExceptions
JGroupMembershipManager.this.shutdownCause = e;
AlertAppender.getInstance().shuttingDown();
// QA thought that the long JGroups stack traces were a nuisance
// I [bruce] changed the error messages to include enough info
// for us to pinpoint where the ForcedDisconnect occurred.
String reason;
Exception logException = e;
if (e instanceof ForcedDisconnectException) {
reason = "Channel closed: " + e;
logException = null;
}
else {
reason = "Channel closed";
}
if (!inhibitForceDisconnectLogging) {
logger.fatal(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_SERVICE_FAILURE_0, reason), logException);
}
// stop server locators immediately since they may not have correct
// information. This has caused client failures in bridge/wan
// network-down testing
List locs = Locator.getLocators();
if (locs.size() > 0) {
for (Iterator it=locs.iterator(); it.hasNext(); ) {
InternalLocator loc = (InternalLocator)it.next();
// if (loc.isServerLocator()) { auto-reconnect requires locator to stop now
loc.stop(!JGroupMembershipManager.this.dconfig.getDisableAutoReconnect(), false);
// }
}
}
InternalDistributedSystem system = InternalDistributedSystem.getAnyInstance();
if (system != null) {
DM dm = system.getDM();
if (dm != null) {
dm.setRootCause(e);
}
}
JGroupMembershipManager.this.uncleanShutdown(reason, e);
}
}
} // Puller class
/** the jgroups receiver object */
Puller puller;
/** an exception that caused the manager to shut down */
volatile Exception shutdownCause;
/**
* the timer used to perform periodic tasks
* @guarded.By latestViewLock
*/
private SystemTimer cleanupTimer;
/**
* Creates a Jgroups {@link Channel} using the GemFire-specified
* configuration.
*
* @param transport
* Transport configuration
*
* @throws GemFireConfigException
* The Jgroups config file is missing or malformatted
* @throws DistributionException
* Something goes wrong while connecting to Jgroups
*/
private JChannel createChannel(RemoteTransportConfig transport, DistributionConfig theConfig)
{
this.shutdownCause = null;
JGroupMembershipManager.isMcastDiscovery = transport.isMcastDiscovery();
JGroupMembershipManager.isMcastEnabled = transport.isMcastEnabled();
// boolean isWindows = false;
// String os = System.getProperty("os.name");
// if (os != null) {
// if (os.indexOf("Windows") != -1) {
// isWindows = true;
// }
// }
String properties;
InputStream is;
if (JAVAGROUPS_CONFIG != null) {
File file = new File(JAVAGROUPS_CONFIG);
if (!file.exists()) {
throw new GemFireConfigException(LocalizedStrings.JGroupMembershipManager_JGROUPS_CONFIGURATION_FILE_0_DOES_NOT_EXIST.toLocalizedString(JAVAGROUPS_CONFIG));
}
try {
if (logger.isDebugEnabled()) {
logger.debug("Reading Jgroups config from {}", file);
}
is = new FileInputStream(file);
}
catch (IOException ex) {
throw new GemFireConfigException(LocalizedStrings.JGroupMembershipManager_AN_IOEXCEPTION_WAS_THROWN_WHILE_OPENING_0.toLocalizedString(file), ex);
}
}
else {
String r = null;
if (JGroupMembershipManager.isMcastEnabled) {
r = DEFAULT_JAVAGROUPS_MCAST_CONFIG;
} else {
r = DEFAULT_JAVAGROUPS_TCP_CONFIG;
}
is = ClassPathLoader.getLatest().getResourceAsStream(getClass(), r);
if (is == null) {
throw new GemFireConfigException(LocalizedStrings.JGroupMembershipManager_CANNOT_FIND_0.toLocalizedString(r));
}
}
try {
//PlainConfigurator config = PlainConfigurator.getInstance(is);
//properties = config.getProtocolStackString();
StringBuffer sb = new StringBuffer(3000);
BufferedReader br;
if (JAVAGROUPS_CONFIG != null) {
br = new BufferedReader(new InputStreamReader(is));
} else {
br = new BufferedReader(new InputStreamReader(is, "US-ASCII"));
}
String input;
while ((input=br.readLine()) != null) {
sb.append(input);
}
br.close();
properties = sb.toString();
}
catch (Exception ex) {
throw new GemFireConfigException(LocalizedStrings.JGroupMembershipManager_AN_EXCEPTION_WAS_THROWN_WHILE_READING_JGROUPS_CONFIG.toLocalizedString(), ex);
}
// see whether the FD protocol or FD_SOCK protocol should be used
long fdTimeout = Long.getLong("gemfire.FD_TIMEOUT", 0).longValue(); // in 4.1.2 to force use of FD
if (JGroupMembershipManager.isMcastEnabled) {
properties = replaceStrings(properties, "MULTICAST_PORT", String.valueOf(transport.getMcastId().getPort()));
properties = replaceStrings(properties, "MULTICAST_HOST", transport.getMcastId().getHost().getHostAddress());
properties = replaceStrings(properties, "MULTICAST_TTL", String.valueOf(theConfig.getMcastTtl()));
properties = replaceStrings(properties, "MULTICAST_SEND_BUFFER_SIZE", String.valueOf(theConfig.getMcastSendBufferSize()));
properties = replaceStrings(properties, "MULTICAST_RECV_BUFFER_SIZE", String.valueOf(theConfig.getMcastRecvBufferSize()));
// set the nakack retransmit limit smaller than the frag size to allow
// room for nackack headers and src address
properties = replaceStrings(properties, "RETRANSMIT_LIMIT", String.valueOf(theConfig.getUdpFragmentSize()-256));
properties = replaceStrings(properties, "MAX_SENT_MSGS_SIZE", System.getProperty("p2p.maxSentMsgsSize", "0"));
}
if (Boolean.getBoolean("p2p.simulateDiscard")) {
properties = replaceStrings(properties, "DISCARD",
"com.gemstone.org.jgroups.protocols.DISCARD(down_thread=false; up_thread=false;"
+ "up=" + System.getProperty("p2p.simulateDiscard.received", "0") + ";"
+ "down=" + System.getProperty("p2p.simulateDiscard.sent", "0.05") + ")?");
}
else {
properties = replaceStrings(properties, "DISCARD", "");
}
if (!JGroupMembershipManager.isMcastDiscovery) {
if (JGroupMembershipManager.isMcastEnabled) {
// for non-mcast discovery, we use TCPGOSSIP and remove PING
properties = replaceStrings(properties, "OPTIONAL_GOSSIP_PROTOCOL",
"com.gemstone.org.jgroups.protocols.TCPGOSSIP("
+ "num_ping_requests=NUM_PING_REQUESTS;"
+ "timeout=DISCOVERY_TIMEOUT;" // bug #50084 - solo locator takes member-timeout ms to start. Added DISCOVERY_TIMEOUT setting
// bug #44928 - client "hung" trying to connect to locator so reduced this to member_timeout
+ "split-brain-detection=PARTITION_DETECTION;"
+ "gossip_refresh_rate=GOSSIP_REFRESH_RATE;"
+ "initial_hosts=INITIAL_HOSTS;"
+ "gossip_server_wait_time=GOSSIP_SERVER_WAIT_TIME;"
+ "num_initial_members=2;"
+ "up_thread=false;"
+ "down_thread=false)?");
properties = replaceStrings(properties, "OPTIONAL_PING_PROTOCOL", "");
}
// String locators = transport.locatorsString();
properties = replaceStrings(properties, "INITIAL_HOSTS", transport.locatorsString());
properties = replaceStrings(properties, "NUM_INITIAL_MEMBERS", System.getProperty("p2p.numInitialMembers", "1"));
// with locators, we don't want to become the initial coordinator, so disable this ability
boolean disableCoord = !Boolean.getBoolean("p2p.enableInitialCoordinator");
if (theConfig.getStartLocator() != null) {
disableCoord = false;
}
properties = replaceStrings(properties, "DISABLE_COORD", String.valueOf(disableCoord));
}
else {
// PING is the multicast discovery protocol
properties = replaceStrings(properties, "OPTIONAL_GOSSIP_PROTOCOL", "");
properties = replaceStrings(properties, "OPTIONAL_PING_PROTOCOL",
"com.gemstone.org.jgroups.protocols.PING("
+ "timeout=DISCOVERY_TIMEOUT;"
+ "down_thread=false;"
+ "up_thread=false;"
+ "num_initial_members=NUM_INITIAL_MEMBERS;"
+ "num_ping_requests=NUM_PING_REQUESTS)?");
// for multicast, we allow a client to become the coordinator and pray that jgroups
// won't mess up too badly
if (System.getProperty("p2p.enableInitialCoordinator") != null) {
properties = replaceStrings(properties, "DISABLE_COORD", String.valueOf(!Boolean.getBoolean("p2p.enableInitialCoordinator")));
}
else {
properties = replaceStrings(properties, "DISABLE_COORD", "false");
}
properties = replaceStrings(properties, "NUM_INITIAL_MEMBERS", System.getProperty("p2p.numInitialMembers", "2"));
}
long burstLimit = theConfig.getMcastFlowControl().getByteAllowance() / 4;
burstLimit = Long.getLong("p2p.retransmit-burst-limit", burstLimit);
properties = replaceStrings(properties, "RETRANSMIT_BURST_LIMIT", String.valueOf(burstLimit));
long discoveryTimeout = Long.getLong("p2p.discoveryTimeout", 5000).longValue();
properties = replaceStrings(properties, "DISCOVERY_TIMEOUT", ""+discoveryTimeout);
int defaultJoinTimeout = 17000;
int defaultNumPings = 1; // number of get_mbrs loops per findInitialMembers
if (theConfig.getLocators().length() > 0 && !Locator.hasLocators()) {
defaultJoinTimeout = 60000;
}
int joinTimeout = Integer.getInteger("p2p.joinTimeout", defaultJoinTimeout).intValue();
int numPings = Integer.getInteger("p2p.discoveryProbes", defaultNumPings);
properties = replaceStrings(properties, "JOIN_TIMEOUT", ""+joinTimeout);
properties = replaceStrings(properties, "NUM_PING_REQUESTS", ""+numPings);
properties = replaceStrings(properties, "LEAVE_TIMEOUT", System.getProperty("p2p.leaveTimeout", "5000"));
properties = replaceStrings(properties, "SOCKET_TIMEOUT", System.getProperty("p2p.socket_timeout", "60000"));
final String gossipRefreshRate;
// if network partition detection is enabled, we must connect to the locators
// more frequently in order to make sure we're not isolated from them
if (theConfig.getEnableNetworkPartitionDetection()) {
if (!SocketCreator.FORCE_DNS_USE) {
IpAddress.resolve_dns = false; // do not resolve host names since DNS lookup can hang if the NIC fails
SocketCreator.resolve_dns = false;
}
}
gossipRefreshRate = System.getProperty("p2p.gossipRefreshRate", "57123");
properties = replaceStrings(properties, "GOSSIP_REFRESH_RATE", gossipRefreshRate);
// for the unicast recv buffer, we use a reduced buffer size if tcpconduit is enabled and multicast is
// not being used
if (JGroupMembershipManager.isMcastEnabled || transport.isTcpDisabled() ||
(theConfig.getUdpRecvBufferSize() != DistributionConfig.DEFAULT_UDP_RECV_BUFFER_SIZE) ) {
properties = replaceStrings(properties, "UDP_RECV_BUFFER_SIZE", ""+theConfig.getUdpRecvBufferSize());
}
else {
properties = replaceStrings(properties, "UDP_RECV_BUFFER_SIZE", ""+DistributionConfig.DEFAULT_UDP_RECV_BUFFER_SIZE_REDUCED);
}
// bug #40436: both Windows and Linux machines will drop messages if the NIC is unplugged from the network.
// Solaris is unknown at this point. (Bruce 3/23/09) LOOPBACK ensures that failure-detection methods are
// received by the sender
properties = replaceStrings(properties, "LOOPBACK", "" + !Boolean.getBoolean("p2p.DISABLE_LOOPBACK"));
// View.MAX_VIEW_SIZE = theConfig.getUdpFragmentSize() - 50 - 100;
// AuthHeader.MAX_CREDENTIAL_SIZE = theConfig.getUdpFragmentSize() - 1000;
int[] range = theConfig.getMembershipPortRange();
properties = replaceStrings(properties, "MEMBERSHIP_PORT_RANGE_START", ""+range[0]);
properties = replaceStrings(properties, "MEMBERSHIP_PORT_RANGE_END", ""+range[1]);
properties = replaceStrings(properties, "UDP_SEND_BUFFER_SIZE", ""+theConfig.getUdpSendBufferSize());
properties = replaceStrings(properties, "UDP_FRAGMENT_SIZE", ""+theConfig.getUdpFragmentSize());
properties = replaceStrings(properties, "MAX_BUNDLE_SIZE", ""+(theConfig.getUdpFragmentSize()+3072));
properties = replaceStrings(properties, "FC_MAX_CREDITS", ""+theConfig.getMcastFlowControl().getByteAllowance());
properties = replaceStrings(properties, "FC_THRESHOLD", ""+theConfig.getMcastFlowControl().getRechargeThreshold());
properties = replaceStrings(properties, "FC_MAX_BLOCK", ""+theConfig.getMcastFlowControl().getRechargeBlockMs());
long fdt = (fdTimeout > 0) ? fdTimeout : theConfig.getMemberTimeout();
String fdts = String.valueOf(fdt);
properties = replaceStrings(properties, "MEMBER_TIMEOUT", fdts);
properties = replaceStrings(properties, "CONNECT_TIMEOUT", fdts);
// The default view-ack timeout in 7.0 is 12347 ms but is adjusted based on the member-timeout.
// We don't want a longer timeout than 12437 because new members will likely time out trying to
// connect because their join timeouts are set to expect a shorter period
int ackCollectionTimeout = theConfig.getMemberTimeout() * 2 * 12437 / 10000;
if (ackCollectionTimeout < 1500) {
ackCollectionTimeout = 1500;
} else if (ackCollectionTimeout > 12437) {
ackCollectionTimeout = 12437;
}
ackCollectionTimeout = Integer.getInteger("gemfire.VIEW_ACK_TIMEOUT", ackCollectionTimeout).intValue();
properties = replaceStrings(properties, "ACK_COLLECTION_TIMEOUT", ""+ackCollectionTimeout);
properties = replaceStrings(properties, "MAX_TRIES", System.getProperty("gemfire.FD_MAX_TRIES", "1"));
properties = replaceStrings(properties, "VIEW_SYNC_INTERVAL", String.valueOf(fdt));
if (!Boolean.getBoolean("p2p.enableBatching")) {
properties = replaceStrings(properties, "ENABLE_BUNDLING", "false");
properties = replaceStrings(properties, "BUNDLING_TIMEOUT", "30");
}
else {
properties = replaceStrings(properties, "ENABLE_BUNDLING", "true");
properties = replaceStrings(properties, "BUNDLING_TIMEOUT", System.getProperty("p2p.batchFlushTime", "30"));
}
properties = replaceStrings(properties, "OUTGOING_PACKET_HANDLER",
String.valueOf(Boolean.getBoolean("p2p.outgoingPacketHandler")));
properties = replaceStrings(properties, "INCOMING_PACKET_HANDLER",
String.valueOf(Boolean.getBoolean("p2p.incomingPacketHandler")));
properties = replaceStrings(properties, "PARTITION_DETECTION",
String.valueOf(theConfig.getEnableNetworkPartitionDetection()));
int threshold = Integer.getInteger("gemfire.network-partition-threshold", 51);
if (threshold < 51) threshold = 51;
if (threshold > 100) threshold = 100;
properties = replaceStrings(properties, "PARTITION_THRESHOLD",
String.valueOf(threshold));
int weight = Integer.getInteger("gemfire.member-weight", 0);
properties = replaceStrings(properties, "MEMBER_WEIGHT", String.valueOf(weight));
properties = replaceStrings(properties, "GOSSIP_SERVER_WAIT_TIME", ""+theConfig.getLocatorWaitTime());
if (logger.isDebugEnabled()) {
logger.debug("Jgroups configuration: {}", properties);
}
JChannel myChannel = null;
synchronized (latestViewLock) {
try {
this.isJoining = true; // added for bug #44373
// connect
long start = System.currentTimeMillis();
boolean reattempt;
String channelName = GossipServer.CHANNEL_NAME;
boolean debugConnect = Boolean.getBoolean("p2p.debugConnect");
puller = new Puller();
GFBasicAdapter jgBasicAdapter = new GFJGBasicAdapter();
GFPeerAdapter jgPeerAdapter = new GFJGPeerAdapter(this, stats);
do {
reattempt = false;
myChannel = new JChannel(properties, jgBasicAdapter, jgPeerAdapter,
DistributionStats.enableClockStats, enableJgStackStats);
myChannel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
myChannel.setOpt(Channel.AUTO_GETSTATE, Boolean.FALSE);
myChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
this.channelInitialized = true;
// [bruce] set the puller now to avoid possibility of loss of messages
myChannel.setReceiver(puller);
if (debugConnect) {
setDebugJGroups(true);
}
// [bruce] the argument to connect() goes out with each message and forms the initial
// criterion for whether a message will be accepted for us by jgroups
try {
myChannel.connect(channelName);
}
catch (ShunnedAddressException e) {
if (logger.isDebugEnabled()) {
logger.debug("Address was shunned by membership coordinator - will reattempt");
}
myChannel.close();
reattempt = true;
}
}
while (reattempt);
long delta = System.currentTimeMillis() - start;
if (debugConnect) {
setDebugJGroups(false);
}
if (logger.isTraceEnabled(LogMarker.JGROUPS)) {
logger.trace(LogMarker.JGROUPS, "Connected JGroups stack: {}", myChannel.printProtocolSpec(false));
}
// Populate our initial view
View v = myChannel.getView();
if (v == null)
throw new DistributionException(LocalizedStrings.JGroupMembershipManager_NULL_VIEW_FROM_JGROUPS.toLocalizedString(),
new InternalGemFireError(LocalizedStrings.JGroupMembershipManager_NO_VIEW.toLocalizedString()));
if (this.directChannel != null) {
this.directChannel.setMembershipSize(v.getMembers().size());
}
lastViewId = v.getVid().getId();
latestView = viewToMemberView(v);
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
logger.trace(LogMarker.DISTRIBUTION_VIEWS, "JGroups: initial view is {}", DistributionManager.printView(latestView));
}
logger.info(LogMarker.DISTRIBUTION, LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_CONNECTED_TO_JGROUPS_CHANNEL_TOOK__0__MS, delta));
// get the unicast protocol so we can flush it when necessary
ucastProtocol = (com.gemstone.org.jgroups.protocols.UNICAST)myChannel
.getProtocolStack().findProtocol("UNICAST");
Assert.assertTrue(ucastProtocol != null,
"Malformed protocol stack is missing UNICAST");
// Load this protocol for bug 36851 and for use in emergencyClose
fdProtocol = (FD)myChannel.getProtocolStack().findProtocol("FD");
// get the VERIFY_SUSPECT protocol for liveness notification
verifySuspectProtocol = (VERIFY_SUSPECT)myChannel.getProtocolStack()
.findProtocol("VERIFY_SUSPECT");
// Get other protocols we should touch during emergencyClose
fdSockProtocol = (FD_SOCK)myChannel.getProtocolStack().findProtocol("FD_SOCK");
udpProtocol = (UDP)myChannel.getProtocolStack().findProtocol("UDP");
tcpProtocol = (TCP)myChannel.getProtocolStack().findProtocol("TCP");
nakAckProtocol = (NAKACK)myChannel.getProtocolStack().findProtocol("NAKACK");
gmsProtocol = (GMS)myChannel.getProtocolStack().findProtocol("GMS");
return myChannel;
} catch (RuntimeException ex) {
throw ex;
}
catch (Exception ex) {
if (ex.getCause() != null && ex.getCause().getCause() instanceof SystemConnectException) {
throw (SystemConnectException)(ex.getCause().getCause());
}
throw new DistributionException(LocalizedStrings.JGroupMembershipManager_AN_EXCEPTION_WAS_THROWN_WHILE_CONNECTING_TO_JGROUPS.toLocalizedString(), ex);
}
finally {
this.isJoining = false;
}
} // synchronized
}
public JGroupMembershipManager() {
// caller must invoke initialize() after creating a JGMM
}
public JGroupMembershipManager initialize(
DistributedMembershipListener listener,
DistributionConfig config,
RemoteTransportConfig transport,
DMStats stats
) throws ConnectionException
{
Assert.assertTrue(listener != null);
Assert.assertTrue(config != null);
//you can use a dummy stats object to remove most stat overhead
//this.stats = new com.gemstone.gemfire.distributed.internal.LonerDistributionManager.DummyDMStats();
this.stats = stats;
this.listener = listener;
this.dconfig = config;
this.membershipCheckTimeout = config.getSecurityPeerMembershipTimeout();
this.wasReconnectingSystem = transport.getIsReconnectingDS();
this.oldDSMembershipSocket = (DatagramSocket)transport.getOldDSMembershipInfo();
if (!config.getDisableTcp()) {
dcReceiver = new MyDCReceiver(listener);
directChannel = new DirectChannel(this, dcReceiver, config, null);
}
int dcPort = 0;
if (!config.getDisableTcp()) {
dcPort = directChannel.getPort();
}
// FIXME: payload handling is inconsistent (note from JasonP.)
MemberAttributes.setDefaults(dcPort,
MemberAttributes.DEFAULT.getVmPid(),
MemberAttributes.DEFAULT.getVmKind(),
MemberAttributes.DEFAULT.getVmViewId(),
MemberAttributes.DEFAULT.getName(),
MemberAttributes.DEFAULT.getGroups(), MemberAttributes.DEFAULT.getDurableClientAttributes());
this.vmKind = MemberAttributes.DEFAULT.getVmKind(); // we need this during jchannel startup
surpriseMemberTimeout = Math.max(20 * DistributionConfig.DEFAULT_MEMBER_TIMEOUT,
20 * config.getMemberTimeout());
surpriseMemberTimeout = Integer.getInteger("gemfire.surprise-member-timeout", surpriseMemberTimeout).intValue();
try {
this.channel = createChannel(transport, config);
}
catch (RuntimeException e) {
if (directChannel != null) {
directChannel.disconnect(e);
}
throw e;
}
IpAddress myAddr = (IpAddress)channel.getLocalAddress();
MemberAttributes.setDefaults(dcPort,
MemberAttributes.DEFAULT.getVmPid(),
MemberAttributes.DEFAULT.getVmKind(),
myAddr.getBirthViewId(),
MemberAttributes.DEFAULT.getName(),
MemberAttributes.DEFAULT.getGroups(), MemberAttributes.DEFAULT.getDurableClientAttributes());
if (directChannel != null) {
directChannel.getConduit().setVmViewID(myAddr.getBirthViewId());
}
myMemberId = new InternalDistributedMember(myAddr.getIpAddress(),
myAddr.getPort(),
myAddr.splitBrainEnabled(),
myAddr.preferredForCoordinator(),
MemberAttributes.DEFAULT);
// in order to debug startup issues it we need to announce the membership
// ID as soon as we know it
logger.info(LocalizedMessage.create(LocalizedStrings.JGroupMembershipManager_entered_into_membership_in_group_0_with_id_1,
new Object[]{myMemberId}));
if (!dconfig.getDisableTcp()) {
this.conduit = directChannel.getConduit();
directChannel.setLocalAddr(myMemberId);
Stub stub = conduit.getId();
memberToStubMap.put(myMemberId, stub);
stubToMemberMap.put(stub, myMemberId);
}
this.hasConnected = true;
return this;
}
/** this is invoked by JGroups when there is a loss of quorum in the membership system */
public void quorumLost(Set failures, List remaining) {
// notify of quorum loss if split-brain detection is enabled (meaning we'll shut down) or
// if the loss is more than one member
boolean notify = failures.size() > 1;
if (!notify) {
JChannel ch = this.channel;
Address localAddr = (ch != null) ? ch.getLocalAddress() : null;
notify = (localAddr != null) && localAddr.splitBrainEnabled();
}
if (notify) {
if (inhibitForceDisconnectLogging) {
if (logger.isDebugEnabled()) {
logger.debug("<ExpectedException action=add>Possible loss of quorum</ExpectedException>");
}
}
logger.fatal(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_POSSIBLE_LOSS_OF_QUORUM_DETECTED, new Object[] {failures.size(), failures}));
if (inhibitForceDisconnectLogging) {
if (logger.isDebugEnabled()) {
logger.debug("<ExpectedException action=remove>Possible loss of quorum</ExpectedException>");
}
}
// get member IDs for the collections so we can notify the
// gemfire distributed system membership listeners
Set<InternalDistributedMember> idmFailures = new HashSet<InternalDistributedMember>();
for (Iterator it=failures.iterator(); it.hasNext(); ) {
IpAddress ipaddr = (IpAddress)it.next();
JGroupMember jgm = new JGroupMember(ipaddr);
InternalDistributedMember mbr = new InternalDistributedMember(jgm);
idmFailures.add(mbr);
}
List<InternalDistributedMember> idmRemaining = new ArrayList<InternalDistributedMember>(remaining.size());
for (Iterator it=remaining.iterator(); it.hasNext(); ) {
IpAddress ipaddr = (IpAddress)it.next();
JGroupMember jgm = new JGroupMember(ipaddr);
InternalDistributedMember mbr = new InternalDistributedMember(jgm);
idmRemaining.add(mbr);
}
try {
this.listener.quorumLost(idmFailures, idmRemaining);
} catch (CancelException e) {
// safe to ignore - a forced disconnect probably occurred
}
}
}
public boolean testMulticast() {
if (isMcastEnabled) {
return this.udpProtocol.testMulticast(dconfig.getMemberTimeout());
} else {
return true;
}
}
/**
* Remove a member, or queue a startup operation to do so
* @param dm the member to shun
* @param crashed true if crashed
* @param reason the reason, esp. if crashed
*/
protected void handleOrDeferRemove(InternalDistributedMember dm,
boolean crashed, String reason) {
synchronized(startupLock) {
if (!processingEvents) {
startupMessages.add(new StartupEvent(dm, crashed, reason));
return;
}
}
removeMember(dm, crashed, reason);
}
/**
* Remove a member. {@link #latestViewLock} must be held
* before this method is called. If member is not already shunned,
* the uplevel event handler is invoked.
*
* @param dm
* @param crashed
* @param reason
*/
protected void removeWithViewLock(InternalDistributedMember dm,
boolean crashed, String reason) {
boolean wasShunned = isShunned(dm);
// Delete resources
destroyMember(dm, crashed, reason);
if (wasShunned) {
return; // Explicit deletion, no upcall.
}
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
logger.trace(LogMarker.DISTRIBUTION_VIEWS, "Membership: dispatching uplevel departure event for < {} >", dm);
}
try {
listener.memberDeparted(dm, crashed, reason);
}
catch (DistributedSystemDisconnectedException se) {
// let's not get huffy about it
}
}
/**
* Automatic removal of a member (for internal
* use only). Synchronizes on {@link #latestViewLock} and then deletes
* the member.
*
* @param dm
* @param crashed
* @param reason
*/
protected void removeMember(InternalDistributedMember dm,
boolean crashed, String reason)
{
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
StringBuffer sb = new StringBuffer(200);
sb.append("Membership: removing <")
.append(dm)
.append(">; crashed = ")
.append(crashed)
.append("; reason = ");
if (reason != null && (reason.indexOf("NoSuchMemberException") >= 0)) {
sb.append("tcp/ip connections closed");
}
else {
sb.append(reason);
}
logger.trace(LogMarker.DISTRIBUTION_VIEWS, sb);
}
synchronized (latestViewLock) {
removeWithViewLock(dm, crashed, reason);
}
}
/**
* Process a surprise connect event, or place it on the startup queue.
* @param member the member
* @param stub its stub
*/
protected void handleOrDeferSurpriseConnect(InternalDistributedMember member,
Stub stub) {
synchronized (startupLock) {
if (!processingEvents) {
startupMessages.add(new StartupEvent(member, stub));
return;
}
}
processSurpriseConnect(member, stub);
}
public void startupMessageFailed(DistributedMember mbr, String failureMessage) {
// fix for bug #40666
addShunnedMember((InternalDistributedMember)mbr);
// fix for bug #41329, hang waiting for replies
try {
listener.memberDeparted((InternalDistributedMember)mbr, true, "failed to pass startup checks");
}
catch (DistributedSystemDisconnectedException se) {
// let's not get huffy about it
}
}
/**
* Logic for handling a direct connection event (message received
* from a member not in the JGroups view). Does not employ the
* startup queue.
* <p>
* Must be called with {@link #latestViewLock} held. Waits
* until there is a stable view. If the member has already
* been added, simply returns; else adds the member.
*
* @param dm the member joining
* @param stub the member's stub
*/
public boolean addSurpriseMember(DistributedMember dm,
Stub stub) {
final InternalDistributedMember member = (InternalDistributedMember)dm;
Stub s = null;
boolean warn = false;
synchronized(latestViewLock) {
// At this point, the join may have been discovered by
// other means.
if (latestView.contains(member)) {
return true;
}
if (surpriseMembers.containsKey(member)) {
return true;
}
if (latestView.getViewNumber() > member.getVmViewId()) {
// tell the process that it should shut down distribution.
// Run in a separate thread so we don't hold the view lock during the request. Bug #44995
new Thread(Thread.currentThread().getThreadGroup(),
"Removing shunned GemFire node " + member) {
@Override
public void run() {
// fix for bug #42548
// this is an old member that shouldn't be added
logger.fatal(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_Invalid_Surprise_Member, new Object[]{member, latestView}));
requestMemberRemoval(member, "this member is no longer in the view but is initiating connections");
}
}.start();
addShunnedMember(member);
return false;
}
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
logger.trace(LogMarker.DISTRIBUTION_VIEWS, "Membership: Received message from surprise member: <{}>. My view number is {} it is {}",
member, latestView.getViewNumber(), member.getVmViewId());
}
// Adding him to this set ensures we won't remove him if a new
// JGroups view comes in and he's still not visible.
surpriseMembers.put(member, Long.valueOf(System.currentTimeMillis()));
if (shutdownInProgress()) {
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
logger.trace(LogMarker.DISTRIBUTION_VIEWS, "Membership: new member during shutdown ignored: <{}>", member);
}
// Force disconnect, esp. the TCPConduit
String msg = LocalizedStrings.JGroupMembershipManager_THIS_DISTRIBUTED_SYSTEM_IS_SHUTTING_DOWN.toLocalizedString();
if (directChannel != null) {
try {
directChannel.closeEndpoint(member, msg);
} catch (DistributedSystemDisconnectedException e) {
// ignore - happens during shutdown
}
}
destroyMember(member, false, msg); // for good luck
return true; // allow during shutdown
}
if (isShunned(member)) {
warn = true;
surpriseMembers.remove(member);
} else {
// Now that we're sure the member is new, add them.
if (logger.isTraceEnabled(LogMarker.DM_VIEWS)) {
logger.trace(LogMarker.DM_VIEWS, "Membership: Processing surprise addition <{}>", member);
}
// make sure the surprise-member cleanup task is running
if (this.cleanupTimer == null) {
startCleanupTimer();
} // cleanupTimer == null
// fix for bug #42006, lingering old identity
Object oldStub = this.memberToStubMap.remove(member);
if (oldStub != null) {
this.stubToMemberMap.remove(oldStub);
}
s = stub == null ? getStubForMember(member) : stub;
// Make sure that channel information is consistent
addChannel(member, s);
// Ensure that the member is accounted for in the view
// Conjure up a new view including the new member. This is necessary
// because we are about to tell the listener about a new member, so
// the listener should rightfully expect that the member is in our
// membership view.
// However, we put the new member at the end of the list. This
// should ensure he's not chosen as an elder.
// This will get corrected when he finally shows up in the JGroups
// view.
NetView newMembers = new NetView(latestView, latestView.getViewNumber());
newMembers.add(member);
latestView = newMembers;
}
}
if (warn) { // fix for bug #41538 - deadlock while alerting
logger.warn(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_IGNORING_SURPRISE_CONNECT_FROM_SHUNNED_MEMBER_0, member));
} else {
listener.newMemberConnected(member, s);
}
return !warn;
}
/** starts periodic task to perform cleanup chores such as expire surprise members */
private void startCleanupTimer() {
synchronized(this.latestViewLock) {
if (this.cleanupTimer != null) {
return;
}
DistributedSystem ds = InternalDistributedSystem.getAnyInstance();
if (ds != null && ds.isConnected()) {
this.cleanupTimer = new SystemTimer(ds, true);
SystemTimer.SystemTimerTask st = new SystemTimer.SystemTimerTask() {
@Override
public void run2() {
synchronized(latestViewLock) {
long oldestAllowed = System.currentTimeMillis() - surpriseMemberTimeout;
for (Iterator it=surpriseMembers.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry)it.next();
Long birthtime = (Long)entry.getValue();
if (birthtime.longValue() < oldestAllowed) {
it.remove();
InternalDistributedMember m = (InternalDistributedMember)entry.getKey();
logger.info(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_EXPIRING_MEMBERSHIP_OF_SURPRISE_MEMBER_0, m));
removeWithViewLock(m, true, "not seen in membership view in "
+ surpriseMemberTimeout + "ms");
}
}
}
}
};
this.cleanupTimer.scheduleAtFixedRate(st, surpriseMemberTimeout, surpriseMemberTimeout/3);
} // ds != null && ds.isConnected()
}
}
/**
* Dispatch the distribution message, or place it on the startup queue.
*
* @param msg the message to process
*/
protected void handleOrDeferMessage(DistributionMessage msg) {
synchronized(startupLock) {
if (!processingEvents) {
startupMessages.add(new StartupEvent(msg));
return;
}
}
processMessage(msg);
}
public void warnShun(DistributedMember m) {
synchronized (latestViewLock) {
if (!shunnedMembers.containsKey(m)) {
return; // not shunned
}
if (shunnedAndWarnedMembers.contains(m)) {
return; // already warned
}
shunnedAndWarnedMembers.add(m);
} // synchronized
// issue warning outside of sync since it may cause messaging and we don't
// want to hold the view lock while doing that
logger.warn(LocalizedMessage.create(LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_DISREGARDING_SHUNNED_MEMBER_0, m));
}
/**
* Logic for processing a distribution message.
* <p>
* It is possible to receive messages not consistent with our view.
* We handle this here, and generate an uplevel event if necessary
* @param msg the message
*/
protected void processMessage(DistributionMessage msg) {
boolean isNew = false;
InternalDistributedMember m = msg.getSender();
boolean shunned = false;
// First grab the lock: check the sender against our stabilized view.
synchronized (latestViewLock) {
if (isShunned(m)) {
if (msg instanceof StartupMessage) {
endShun(m);
}
else {
// fix for bug 41538 - sick alert listener causes deadlock
// due to view lock being held during messaging
shunned = true;
}
} // isShunned
if (!shunned) {
isNew = !latestView.contains(m) && !surpriseMembers.containsKey(m);
// If it's a new sender, wait our turn, generate the event
if (isNew) {
shunned = !addSurpriseMember(m, getStubForMember(m));
} // isNew
}
// Latch the view before we unlock
} // synchronized
if (shunned) { // bug #41538 - shun notification must be outside synchronization to avoid hanging
warnShun(m);
// if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
logger.info(/*LogMarker.DISTRIBUTION_VIEWS, */"Membership: Ignoring message from shunned member <{}>:{}", m, msg);
// }
throw new MemberShunnedException(getStubForMember(m));
}
// TODO: once upon a time, in a galaxy far far away...
//
// I would love for this upcall to be delivered in
// an environment that guarantees message stability, i.e.
// use a conditional variable to prevent view changes
// to occur while this callback is being processed.
//
// In practice, some of the getInitialImage stuff seems
// to cause deadlocks of almost dlock-level complexity
// when I implemented that, so I've backed off.
//
// (Implementation note: I had a viewInUseCount integer
// and a viewStable conditional variable, so that, with
// the latestViewLock held, this method incremented the count
// before and decremented it afterwards; if the count is
// 0, the viewStable got a signalAll() to allow the
// view changers to wake up and process their events.)
//
// The downside of the current implementation is that
// the sender of this message might disappear before the
// message can be processed. It's also possible for new
// members to arrive while a message is being processed,
// but my intuition suggests this is not as likely to
// cause a problem.
//
// As it stands, the system is (mostly) tolerant of these issues, but I'm
// very concerned that there may be subtle lock/elder/HA bugs that we
// haven't caught yet that may be affected by this.
listener.messageReceived(msg);
}
/**
* Process a new view object, or place on the startup queue
* @param viewArg the new view
*/
protected void handleOrDeferViewEvent(View viewArg) {
if (this.isJoining) {
// bug #44373 - queue all view messages while connecting the jgroups channel.
// This is done under the latestViewLock, but we can't block here because
// we're sitting in the UDP reader thread.
synchronized(startupLock) {
startupMessages.add(new StartupEvent(viewArg));
return;
}
}
synchronized (latestViewLock) {
synchronized(startupLock) {
if (!processingEvents) {
startupMessages.add(new StartupEvent(viewArg));
return;
}
}
// view processing can take a while, so we use a separate thread
// to avoid blocking the jgroups stack
NetView newView = viewToMemberView(viewArg);
long newId = viewArg.getVid().getId();
if (logger.isTraceEnabled(LogMarker.DM_VIEWS)) {
logger.trace(LogMarker.DM_VIEWS, "Membership: queuing new view for processing, id = {}, view = {}",
newId, newView);
}
ViewMessage v = new ViewMessage(myMemberId, newId, newView,
JGroupMembershipManager.this);
listener.messageReceived(v);
}
}
/**
* Process a new view object, or place on the startup queue
* @param suspectInfo the suspectee and suspector
*/
protected void handleOrDeferSuspect(SuspectMember suspectInfo) {
synchronized (latestViewLock) {
synchronized(startupLock) {
if (!processingEvents) {
return;
}
}
InternalDistributedMember suspect = getMemberFromIpAddress((IpAddress)suspectInfo.suspectedMember, true);
InternalDistributedMember who = getMemberFromIpAddress((IpAddress)suspectInfo.whoSuspected, true);
this.suspectedMembers.put(suspect, Long.valueOf(System.currentTimeMillis()));
try {
listener.memberSuspect(suspect, who);
}
catch (DistributedSystemDisconnectedException se) {
// let's not get huffy about it
}
}
}
/**
* Process a potential direct connect. Does not use
* the startup queue. It grabs the {@link #latestViewLock}
* and then processes the event.
* <p>
* It is a <em>potential</em> event, because we don't know until we've
* grabbed a stable view if this is really a new member.
*
* @param member
* @param stub
*/
private void processSurpriseConnect(
InternalDistributedMember member,
Stub stub)
{
synchronized (latestViewLock) {
addSurpriseMember(member, stub);
}
}
/**
* Dispatch routine for processing a single startup event
* @param o the startup event to handle
*/
private void processStartupEvent(StartupEvent o) {
// Most common events first
if (o.isDistributionMessage()) { // normal message
try {
processMessage(o.dmsg);
}
catch (MemberShunnedException e) {
// message from non-member - ignore
}
}
else if (o.isJgView()) { // view event
processView(o.jgView.getVid().getId(), viewToMemberView(o.jgView));
}
else if (o.isDepartureEvent()) { // departure
removeMember(o.member, o.crashed, o.reason);
}
else if (o.isConnect()) { // connect
processSurpriseConnect(o.member, o.stub);
}
else // sanity
throw new InternalGemFireError(LocalizedStrings.JGroupMembershipManager_UNKNOWN_STARTUP_EVENT_0.toLocalizedString(o));
}
/**
* Special mutex to create a critical section for
* {@link #startEventProcessing()}
*/
private final Object startupMutex = new Object();
public void startEventProcessing()
{
// Only allow one thread to perform the work
synchronized (startupMutex) {
if (logger.isDebugEnabled())
logger.debug("Membership: draining startup events.");
// Remove the backqueue of messages, but allow
// additional messages to be added.
for (;;) {
StartupEvent ev;
// Only grab the mutex while reading the queue.
// Other events may arrive while we're attempting to
// drain the queue. This is OK, we'll just keep processing
// events here until we've caught up.
synchronized (startupLock) {
int remaining = startupMessages.size();
if (remaining == 0) {
// While holding the lock, flip the bit so that
// no more events get put into startupMessages, and
// notify all waiters to proceed.
processingEvents = true;
startupLock.notifyAll();
break; // ...and we're done.
}
if (logger.isDebugEnabled()) {
logger.debug("Membership: {} remaining startup message(s)", remaining);
}
ev = (StartupEvent)startupMessages.removeFirst();
} // startupLock
try {
processStartupEvent(ev);
}
catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.warn(LocalizedMessage.create(LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_ERROR_HANDLING_STARTUP_EVENT), t);
}
} // for
if (logger.isDebugEnabled())
logger.debug("Membership: finished processing startup events.");
} // startupMutex
}
public void waitForEventProcessing() throws InterruptedException {
// First check outside of a synchronized block. Cheaper and sufficient.
if (Thread.interrupted()) throw new InterruptedException();
if (processingEvents)
return;
if (logger.isDebugEnabled()) {
logger.debug("Membership: waiting until the system is ready for events");
}
for (;;) {
conduit.getCancelCriterion().checkCancelInProgress(null);
synchronized (startupLock) {
// Now check using a memory fence and synchronization.
if (processingEvents)
break;
boolean interrupted = Thread.interrupted();
try {
startupLock.wait();
}
catch (InterruptedException e) {
interrupted = true;
conduit.getCancelCriterion().checkCancelInProgress(e);
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // synchronized
} // for
if (logger.isDebugEnabled()) {
logger.debug("Membership: continuing");
}
}
public Object getViewLock() {
return this.latestViewLock;
}
/**
* Returns a copy (possibly not current) of the current
* view (a list of {@link DistributedMember}s)
*/
public NetView getView()
{
// Grab the latest view under a mutex...
NetView v;
synchronized (latestViewLock) {
v = latestView;
}
// Create a copy (read-only)
NetView result = new NetView(v.size(), v.getViewNumber());
result.setCreator(v.getCreator());
for (int i = 0; i < v.size(); i ++) {
InternalDistributedMember m = (InternalDistributedMember)v.elementAt(i);
if (isShunned(m)) {
continue;
}
result.add(m);
}
result.setLeadMember(v.getLeadMember());
return result;
}
/**
* test hook<p>
* The lead member is the eldest member with partition detection enabled.<p>
* If no members have partition detection enabled, there will be no
* lead member and this method will return null.
* @return the lead member associated with the latest view
*/
public DistributedMember getLeadMember() {
// note - we go straight to the jgroups stack because the
// DistributionManager queues view changes in a serial executor, where
// they're asynchronously installed. The DS may still see the old leader
if (gmsProtocol == null) {
return null;
}
IpAddress jlead = (IpAddress)gmsProtocol.getLeadMember();
InternalDistributedMember leader;
if (jlead == null) {
leader = null;
}
else {
leader = getMemberFromIpAddress(jlead, true);
}
return leader;
}
/**
* test hook
* @return the current membership view coordinator
*/
public DistributedMember getCoordinator() {
// note - we go straight to the jgroups stack because the
// DistributionManager queues view changes in a serial executor, where
// they're asynchronously installed. The DS may still see the old coordinator
IpAddress jcoord = (IpAddress)gmsProtocol.determineCoordinator();
InternalDistributedMember dm;
if (jcoord == null) {
dm = null;
}
else {
dm = getMemberFromIpAddress(jcoord, true);
}
return dm;
}
/** cached group-management system protocol for test hook */
GMS gmsProtocol;
public boolean memberExists(InternalDistributedMember m) {
Vector v;
synchronized (latestViewLock) {
v = latestView;
}
return v.contains(m);
}
/**
* Returns the identity associated with this member. WARNING: this value will
* be returned after the channel is closed, but in that case it is good for
* logging purposes only. :-)
*/
public InternalDistributedMember getLocalMember()
{
return myMemberId;
}
public void postConnect()
{
if (channelPause > 0) {
logger.info(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_PAUSING_TO_ALLOW_OTHER_CONCURRENT_PROCESSES_TO_JOIN_THE_DISTRIBUTED_SYSTEM));
try {
Thread.sleep(channelPause);
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
channelPause = 0;
}
}
/**
* @see SystemFailure#loadEmergencyClasses()
/**
* break any potential circularity in {@link #loadEmergencyClasses()}
*/
private static volatile boolean emergencyClassesLoaded = false;
/**
* inhibits logging of ForcedDisconnectException to keep dunit logs clean
* while testing this feature
*/
protected static volatile boolean inhibitForceDisconnectLogging;
/**
* Ensure that the critical classes from JGroups and the TCP conduit
* implementation get loaded.
*
* @see SystemFailure#loadEmergencyClasses()
*/
public static void loadEmergencyClasses() {
if (emergencyClassesLoaded) return;
emergencyClassesLoaded = true;
com.gemstone.org.jgroups.protocols.FD_SOCK.loadEmergencyClasses();
com.gemstone.org.jgroups.protocols.FD.loadEmergencyClasses();
DirectChannel.loadEmergencyClasses();
ProtocolStack.loadEmergencyClasses();
UDP.loadEmergencyClasses();
TP.loadEmergencyClasses();
}
/**
* Close the receiver, avoiding all potential deadlocks and
* eschewing any attempts at being graceful.
*
* @see SystemFailure#emergencyClose()
*/
public void emergencyClose() {
final boolean DEBUG = SystemFailure.TRACE_CLOSE;
setShutdown();
puller = null;
// Problematic; let each protocol discover and die on its own
// if (channel != null && channel.isConnected()) {
// channel.close();
// }
// We can't call close() because they will allocate objects. Attempt
// a surgical strike and touch the important protocols.
// MOST important, kill the FD protocols...
if (fdSockProtocol != null) {
if (DEBUG) {
System.err.println("DEBUG: emergency close of FD_SOCK");
}
fdSockProtocol.emergencyClose();
}
if (fdProtocol != null) {
if (DEBUG) {
System.err.println("DEBUG: emergency close of FD");
}
fdProtocol.emergencyClose();
}
// Close the TCPConduit sockets...
if (directChannel != null) {
if (DEBUG) {
System.err.println("DEBUG: emergency close of DirectChannel");
}
directChannel.emergencyClose();
}
// Less important, but for cleanliness, get rid of some
// datagram sockets...
if (udpProtocol != null) {
if (DEBUG) {
System.err.println("DEBUG: emergency close of UDP");
}
udpProtocol.emergencyClose();
}
if (tcpProtocol != null) {
if (DEBUG) {
System.err.println("DEBUG: emergency close of TCP");
}
tcpProtocol.emergencyClose();
}
// Clear the TimeScheduler, it might act up and cause problems?
ProtocolStack ps = this.channel.getProtocolStack();
if (ps != null) {
if (DEBUG) {
System.err.println("DEBUG: emergency close of ProtocolStack");
}
ps.emergencyClose();
}
// TODO: could we guarantee not to allocate objects? We're using Darrel's
// factory, so it's possible that an unsafe implementation could be
// introduced here.
// stubToMemberMap.clear();
// memberToStubMap.clear();
this.timer.cancel();
if (DEBUG) {
System.err.println("DEBUG: done closing JGroupMembershipManager");
}
}
/**
* in order to avoid split-brain occurring when a member is shutting down due to
* race conditions in view management we add it as a shutdown member when we receive
* a shutdown message. This is not the same as a SHUNNED member.
*/
public void shutdownMessageReceived(InternalDistributedMember id, String reason) {
if (logger.isDebugEnabled()) {
logger.debug("Membership: recording shutdown status of {}", id);
}
synchronized(this.shutdownMembers) {
this.shutdownMembers.put(id, id);
}
}
/**
* returns true if a shutdown message has been received from the given address but
* that member is still in the membership view or is a surprise member.
*/
public boolean isShuttingDown(IpAddress addr) {
InternalDistributedMember mbr = (InternalDistributedMember)ipAddrToMemberMap.get(addr);
if (mbr == null) {
JGroupMember m = new JGroupMember(addr);
mbr = new InternalDistributedMember(m);
}
synchronized(shutdownMembers) {
return shutdownMembers.containsKey(mbr);
}
}
public void shutdown()
{
setShutdown(); // for safety
puller = null; // this tells other methods that a shutdown has been requested
// channel.disconnect(); close does this for us
if (channel != null && channel.isConnected()) {
try {
channel.closeAsync();
}
catch (DistributedSystemDisconnectedException e) {
// this can happen if the stack was already shut down by a ForcedDisconnectException
}
}
// [bruce] Do not null out the channel w/o adding appropriate synchronization
if (directChannel != null) {
directChannel.disconnect(null);
// Make sure that channel information is consistent
// Probably not important in this particular case, but just
// to be consistent...
synchronized (latestViewLock) {
destroyMember(myMemberId, false, "orderly shutdown");
}
}
if (logger.isDebugEnabled()) {
logger.debug("Membership: channel closed");
}
}
public void uncleanShutdown(String reason, final Exception e) {
inhibitForcedDisconnectLogging(false);
if (this.channel != null && !this.channel.closing()) {
try {
// bug #39827 - close the channel without disconnecting it first
this.channel.shutdown();
}
catch (DistributedSystemDisconnectedException se) {
// ignore - we're already shutting down
}
}
if (this.directChannel != null) {
this.directChannel.disconnect(e);
}
// if (logWriter instanceof ManagerLogWriter) {
// // avoid trying to send any alerts
// ((ManagerLogWriter)logWriter).shuttingDown(); // TODO:LOG:ALERTS: can we do something here to avoid sending alerts?
// }
// first shut down communication so we don't do any more harm to other
// members
JGroupMembershipManager.this.emergencyClose();
// we have to clear the view before notifying the membership listener,
// so that it won't try sending disconnect messages to members that
// aren't there. Otherwise, it sends the disconnect messages to other
// members, they ignore the "surprise" connections, and we hang.
//JGroupMembershipManager.this.clearView();
if (e != null) {
try {
if (JGroupMembershipManager.this.membershipTestHooks != null) {
List l = JGroupMembershipManager.this.membershipTestHooks;
for (Iterator it=l.iterator(); it.hasNext(); ) {
MembershipTestHook dml = (MembershipTestHook)it.next();
dml.beforeMembershipFailure(reason, e);
}
}
listener.membershipFailure(reason, e);
if (JGroupMembershipManager.this.membershipTestHooks != null) {
List l = JGroupMembershipManager.this.membershipTestHooks;
for (Iterator it=l.iterator(); it.hasNext(); ) {
MembershipTestHook dml = (MembershipTestHook)it.next();
dml.afterMembershipFailure(reason, e);
}
}
}
catch (RuntimeException re) {
logger.warn(LocalizedMessage.create(LocalizedStrings.JGroupMembershipManager_EXCEPTION_CAUGHT_WHILE_SHUTTING_DOWN), re);
}
}
}
/** generate XML for the cache before shutting down due to forced disconnect */
public void saveCacheXmlForReconnect(boolean sharedConfigEnabled) {
// first save the current cache description so reconnect can rebuild the cache
GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
if (cache != null && (cache instanceof Cache)) {
if (!Boolean.getBoolean("gemfire.autoReconnect-useCacheXMLFile")
&& !cache.isSqlfSystem() && !sharedConfigEnabled) {
try {
logger.info("generating XML to rebuild the cache after reconnect completes");
StringPrintWriter pw = new StringPrintWriter();
CacheXmlGenerator.generate((Cache)cache, pw, true, false);
String cacheXML = pw.toString();
cache.getCacheConfig().setCacheXMLDescription(cacheXML);
logger.info("XML generation completed: {}", cacheXML);
} catch (CancelException e) {
logger.info(LocalizedMessage.create(LocalizedStrings.JGroupMembershipManager_PROBLEM_GENERATING_CACHE_XML), e);
}
} else if (sharedConfigEnabled && !cache.getCacheServers().isEmpty()) {
// we need to retain a cache-server description if this JVM was started by gfsh
List<BridgeServerCreation> list = new ArrayList<BridgeServerCreation>(cache.getCacheServers().size());
for (Iterator it = cache.getCacheServers().iterator(); it.hasNext(); ) {
CacheServer cs = (CacheServer)it.next();
BridgeServerCreation bsc = new BridgeServerCreation(cache, cs);
list.add(bsc);
}
cache.getCacheConfig().setCacheServerCreation(list);
logger.info("CacheServer configuration saved");
}
}
}
public boolean requestMemberRemoval(DistributedMember mbr, String reason) {
if (mbr.equals(this.myMemberId)) {
return false;
}
if (gmsProtocol == null) {
return false; // no GMS = no communication
}
IpAddress jcoord = (IpAddress)gmsProtocol.determineCoordinator();
if (jcoord == null) {
logger.fatal(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_REQUEST_INITIATED_TO_REMOVE_MEMBER_0_BUT_THERE_IS_NO_GROUP_COORDINATOR, mbr));
return false;
}
else {
logger.fatal(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_MEMBERSHIP_REQUESTING_REMOVAL_OF_0_REASON_1,
new Object[] {mbr, reason}));
Message msg = new Message();
msg.setDest(jcoord);
msg.isHighPriority = true;
InternalDistributedMember imbr = (InternalDistributedMember)mbr;
msg.putHeader(GMS.name, new GMS.GmsHeader(
GMS.GmsHeader.REMOVE_REQ, ((JGroupMember)imbr.getNetMember()).getAddress(), reason));
Exception problem = null;
try {
channel.send(msg);
}
catch (ChannelClosedException e) {
Throwable cause = e.getCause();
if (cause instanceof ForcedDisconnectException) {
problem = (Exception) cause;
} else {
problem = e;
}
}
catch (ChannelNotConnectedException e) {
problem = e;
}
catch (IllegalArgumentException e) {
problem = e;
}
if (problem != null) {
if (this.shutdownCause != null) {
Throwable cause = this.shutdownCause;
// If ForcedDisconnectException occurred then report it as actual
// problem.
if (cause instanceof ForcedDisconnectException) {
problem = (Exception) cause;
} else {
Throwable ne = problem;
while (ne.getCause() != null) {
ne = ne.getCause();
}
try {
ne.initCause(this.shutdownCause);
}
catch (IllegalArgumentException selfCausation) {
// fix for bug 38895 - the cause is already in place
}
}
}
if (!dconfig.getDisableAutoReconnect()) {
saveCacheXmlForReconnect(dconfig.getUseSharedConfiguration());
}
listener.membershipFailure("Channel closed", problem);
throw new DistributedSystemDisconnectedException("Channel closed", problem);
}
return true;
}
}
public void suspectMembers(Set members, String reason) {
for (Iterator it=members.iterator(); it.hasNext(); ) {
suspectMember((DistributedMember)it.next(), reason);
}
}
public void suspectMember(DistributedMember mbr, String reason) {
if (mbr != null) {
Address jgmbr = ((JGroupMember)((InternalDistributedMember)mbr)
.getNetMember()).getAddress();
this.fdSockProtocol.suspect(jgmbr, false, reason);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Attempt to suspect member with null ID detected. This usually means that the target has left membership during startup.");
}
}
}
public void suspectMember(Stub mbr, String reason) {
InternalDistributedMember idm = getMemberForStub(mbr, false);
if (idm != null) {
Address jgmbr = ((JGroupMember)idm.getNetMember()).getAddress();
this.fdSockProtocol.suspect(jgmbr, false, reason);
}
}
/* like memberExists() this checks to see if the given ID is in the current
* membership view. If it is in the view though we try to connect to its
* failure-detection port to see if it's still around. If we can't then
* suspect processing is initiated on the member with the given reason string.
* @param mbr the member to verify
* @param reason why the check is being done (must not be blank/null)
* @return true if the member checks out
*/
public boolean verifyMember(DistributedMember mbr, String reason) {
if (mbr != null && memberExists((InternalDistributedMember)mbr)) {
Address jgmbr = ((JGroupMember)((InternalDistributedMember)mbr)
.getNetMember()).getAddress();
return this.fdSockProtocol.checkSuspect(jgmbr, reason);
}
return false;
}
/**
* Perform the grossness associated with sending a message over
* a DirectChannel
*
* @param destinations the list of destinations
* @param content the message
* @param theStats the statistics object to update
* @return all recipients who did not receive the message (null if
* all received it)
* @throws NotSerializableException if the message is not serializable
*/
private Set directChannelSend(InternalDistributedMember[] destinations,
DistributionMessage content,
com.gemstone.gemfire.distributed.internal.DistributionStats theStats)
throws NotSerializableException
{
boolean allDestinations;
InternalDistributedMember[] keys;
if (content.forAll()) {
allDestinations = true;
synchronized (latestViewLock) {
Set keySet = memberToStubMap.keySet();
keys = new InternalDistributedMember[keySet.size()];
keys = (InternalDistributedMember[])keySet.toArray(keys);
}
}
else {
allDestinations = false;
keys = destinations;
}
int sentBytes = 0;
try {
sentBytes = directChannel.send(this, keys, content,
this.dconfig.getAckWaitThreshold(),
this.dconfig.getAckSevereAlertThreshold());
if (theStats != null)
theStats.incSentBytes(sentBytes);
}
catch (DistributedSystemDisconnectedException ex) {
if (this.shutdownCause != null) {
throw new DistributedSystemDisconnectedException("DistributedSystem is shutting down", this.shutdownCause);
} else {
throw ex; // see bug 41416
}
}
catch (ConnectExceptions ex) {
if (allDestinations)
return null;
List members = ex.getMembers(); // We need to return this list of failures
// SANITY CHECK: If we fail to send a message to an existing member
// of the view, we have a serious error (bug36202).
Vector view = getView(); // grab a recent view, excluding shunned members
// Iterate through members and causes in tandem :-(
Iterator it_mem = members.iterator();
Iterator it_causes = ex.getCauses().iterator();
while (it_mem.hasNext()) {
InternalDistributedMember member = (InternalDistributedMember)it_mem.next();
Throwable th = (Throwable)it_causes.next();
if (!view.contains(member)) {
continue;
}
logger.fatal(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_FAILED_TO_SEND_MESSAGE_0_TO_MEMBER_1_VIEW_2,
new Object[] {content, member, view}), th);
// Assert.assertTrue(false, "messaging contract failure");
}
return new HashSet(members);
} // catch ConnectionExceptions
catch (ToDataException e) {
throw e; // error in user data
}
catch (CancelException e) {
// not interesting, we're just shutting down
throw e;
}
catch (IOException e) {
if (logger.isDebugEnabled()) {
logger.debug("Membership: directChannelSend caught exception: {}", e.getMessage(), e);
}
if (e instanceof NotSerializableException) {
throw (NotSerializableException)e;
}
}
catch (RuntimeException e) {
if (logger.isDebugEnabled()) {
logger.debug("Membership: directChannelSend caught exception: {}", e.getMessage(), e);
}
throw e;
}
catch (Error e) {
if (logger.isDebugEnabled()) {
logger.debug("Membership: directChannelSend caught exception: {}", e.getMessage(), e);
}
throw e;
}
return null;
}
/*
* (non-Javadoc)
* @see com.gemstone.gemfire.distributed.internal.membership.MembershipManager#isConnected()
*/
public boolean isConnected() {
if (channel != null && channel.isConnected()) {
return true;
}
if (!channelInitialized) {
return true; // startup noise
}
return false;
}
/**
* Returns true if the distributed system is in the process of auto-reconnecting.
* Otherwise returns false.
*/
public boolean isReconnectingDS() {
if (this.hasConnected) {
// if the jgroups channel has been connected then we aren't in the
// middle of a reconnect attempt in this instance of the distributed system
return false;
} else {
return this.wasReconnectingSystem;
}
}
/**
* A quorum checker is used during reconnect to perform quorum
* probes. It is made available here for the UDP protocol to
* hand off ping-pong responses to the checker.
*/
public QuorumCheckerImpl getQuorumCheckerImpl() {
return this.quorumChecker;
}
/**
* During jgroups connect the UDP protocol will invoke
* this method to find the DatagramSocket it should use instead of
* creating a new one.
*/
public DatagramSocket getMembershipSocketForUDP() {
return this.oldDSMembershipSocket;
}
@Override
public QuorumChecker getQuorumChecker() {
if ( ! (this.shutdownCause instanceof ForcedDisconnectException) ) {
return null;
}
if (gmsProtocol == null || udpProtocol == null) {
return null;
}
if (this.quorumChecker != null) {
return this.quorumChecker;
}
QuorumCheckerImpl impl = new QuorumCheckerImpl(
gmsProtocol.getLastView(), gmsProtocol.getPartitionThreshold(), udpProtocol.getMembershipSocket());
impl.initialize();
this.quorumChecker = impl;
return impl;
}
@Override
public void releaseQuorumChecker(QuorumChecker checker) {
((QuorumCheckerImpl)checker).teardown();
InternalDistributedSystem system = InternalDistributedSystem.getAnyInstance();
if (system == null || !system.isConnected()) {
DatagramSocket sock = (DatagramSocket)checker.getMembershipInfo();
if (sock != null && !sock.isClosed()) {
sock.close();
}
}
}
public Set send(InternalDistributedMember[] destinations,
DistributionMessage msg,
com.gemstone.gemfire.distributed.internal.DistributionStats theStats)
throws NotSerializableException
{
Set result = null;
boolean allDestinations = msg.forAll();
// don't allow messages to be sent if we're not connected to the
// jgroups channel
if (!channel.isConnected()) {
// bug34917 - if the channel was previously initialized, it must have got
// into trouble and we're exiting
if (channelInitialized) {
Exception cause = this.shutdownCause;
if (cause == null && channel.exitEvent != null &&
(channel.exitEvent.getArg() instanceof Exception)) {
cause = (Exception)channel.exitEvent.getArg();
}
throw new DistributedSystemDisconnectedException("Distributed System is shutting down", cause);
}
// If we get here, we are starting up, so just report a failure.
if (allDestinations)
return null;
else {
result = new HashSet();
for (int i = 0; i < destinations.length; i ++)
result.add(destinations[i]);
return result;
}
}
// Handle trivial cases
if (destinations == null) {
if (logger.isTraceEnabled())
logger.trace("Membership: Message send: returning early because null set passed in: '{}'", msg);
return null; // trivially: all recipients received the message
}
if (destinations.length == 0) {
if (logger.isTraceEnabled())
logger.trace("Membership: Message send: returning early because empty destination list passed in: '{}'", msg);
return null; // trivially: all recipients received the message
}
msg.setSender(myMemberId);
msg.setBreadcrumbsInSender();
Breadcrumbs.setProblem(null);
boolean useMcast = false;
if (isMcastEnabled) {
useMcast = !disableMulticastForRollingUpgrade && (msg.getMulticast() || allDestinations);
// if (multicastTest && !useMcast) {
// useMcast = (msg instanceof DistributedCacheOperation.CacheOperationMessage);
// }
}
// some messages are sent to everyone, so we use UDP to avoid having to
// obtain tcp/ip connections to them
boolean sendViaJGroups = isForceUDPCommunications(); // enable when bug #46438 is fixed: || msg.sendViaJGroups();
// Wait for sends to be unsuspended
if (sendSuspended) {
synchronized(sendSuspendMutex) {
while (sendSuspended) {
conduit.getCancelCriterion().checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
sendSuspendMutex.wait(10);
conduit.getCancelCriterion().checkCancelInProgress(null);
}
catch(InterruptedException e) {
interrupted = true;
conduit.getCancelCriterion().checkCancelInProgress(e);
}
finally {
if (interrupted)
Thread.currentThread().interrupt();
}
} // while
} // synchronized
} // sendsSuspended
// Handle TCPConduit case
if (!useMcast && !dconfig.getDisableTcp() && !sendViaJGroups) {
result = directChannelSend(destinations, msg, theStats);
// If the message was a broadcast, don't enumerate failures.
if (allDestinations)
return null;
else {
return result;
}
}
// Otherwise, JGroups is going to handle this message
Address local = channel.getLocalAddress();
if (useMcast) {
if (logger.isTraceEnabled(LogMarker.DM))
logger.trace(LogMarker.DM, "Membership: sending < {} > via multicast", msg);
// if there are unack'd unicast messages, we need to wait for
// them to be processed.
if (!DISABLE_UCAST_FLUSH && ucastProtocol != null && ucastProtocol.getNumberOfUnackedMessages() > 0) {
flushUnicast();
}
Exception problem = null;
try {
long startSer = theStats.startMsgSerialization();
Vector<IpAddress> mbrs = this.channel.getView().getMembers();
Message jmsg = createJGMessage(msg, local, Version.CURRENT_ORDINAL);
theStats.endMsgSerialization(startSer);
theStats.incSentBytes(jmsg.getLength());
channel.send(jmsg);
}
catch (ChannelClosedException e) {
Throwable cause = e.getCause();
if (cause instanceof ForcedDisconnectException) {
problem = (Exception) cause;
} else {
problem = e;
}
}
catch (ChannelNotConnectedException e) {
problem = e;
}
catch (IllegalArgumentException e) {
problem = e;
}
if (problem != null) {
if (this.shutdownCause != null) {
Throwable cause = this.shutdownCause;
// If ForcedDisconnectException occurred then report it as actual
// problem.
if (cause instanceof ForcedDisconnectException) {
problem = (Exception) cause;
} else {
Throwable ne = problem;
while (ne.getCause() != null) {
ne = ne.getCause();
}
ne.initCause(this.shutdownCause);
}
}
final String channelClosed = LocalizedStrings.JGroupMembershipManager_CHANNEL_CLOSED.toLocalizedString();
listener.membershipFailure(channelClosed, problem);
throw new DistributedSystemDisconnectedException(channelClosed, problem);
}
} // useMcast
else { // ! useMcast
int len = destinations.length;
List<JGroupMember> calculatedMembers; // explicit list of members
int calculatedLen; // == calculatedMembers.len
if (len == 1 && destinations[0] == DistributionMessage.ALL_RECIPIENTS) { // send to all
// Grab a copy of the current membership
Vector v = getView();
// Construct the list
calculatedLen = v.size();
calculatedMembers = new LinkedList<JGroupMember>();
for (int i = 0; i < calculatedLen; i ++) {
InternalDistributedMember m = (InternalDistributedMember)v.elementAt(i);
calculatedMembers.add((JGroupMember)m.getNetMember());
}
} // send to all
else { // send to explicit list
calculatedLen = len;
calculatedMembers = new LinkedList<JGroupMember>();
for (int i = 0; i < calculatedLen; i ++) {
calculatedMembers.add((JGroupMember)destinations[i].getNetMember());
}
} // send to explicit list
Int2ObjectOpenHashMap messages = new Int2ObjectOpenHashMap();
long startSer = theStats.startMsgSerialization();
boolean firstMessage = true;
for (Iterator it=calculatedMembers.iterator(); it.hasNext(); ) {
JGroupMember mbr = (JGroupMember)it.next();
short version = mbr.getAddress().getVersionOrdinal();
if ( !messages.containsKey(version) ) {
Message jmsg = createJGMessage(msg, local, version);
messages.put(version, jmsg);
if (firstMessage) {
theStats.incSentBytes(jmsg.getLength());
firstMessage = false;
}
}
}
theStats.endMsgSerialization(startSer);
Collections.shuffle(calculatedMembers);
int i=0;
for (Iterator<JGroupMember> it=calculatedMembers.iterator();
it.hasNext(); i++) { // send individually
JGroupMember mbr = it.next();
IpAddress to = mbr.getAddress();
short version = to.getVersionOrdinal();
Message jmsg = (Message)messages.get(version);
if (logger.isTraceEnabled(LogMarker.DM))
logger.trace(LogMarker.DM, "Membership: Sending '{}' to '{}' via udp unicast", msg, mbr);
Exception problem = null;
try {
Message tmp = (i < (calculatedLen-1)) ? jmsg.copy(true) : jmsg;
tmp.setDest(to);
channel.send(tmp);
}
catch (ChannelClosedException e) {
Throwable cause = e.getCause();
if (cause instanceof ForcedDisconnectException) {
problem = (Exception) cause;
} else {
problem = e;
}
}
catch (ChannelNotConnectedException e) {
problem = e;
}
catch (IllegalArgumentException e) {
problem = e;
}
if (problem != null) {
if (this.shutdownCause != null) {
Throwable cause = this.shutdownCause;
// If ForcedDisconnectException occurred then report it as actual
// problem.
if (cause instanceof ForcedDisconnectException) {
problem = (Exception) cause;
} else {
Throwable ne = problem;
while (ne.getCause() != null) {
ne = ne.getCause();
}
ne.initCause(this.shutdownCause);
}
}
listener.membershipFailure("Channel closed", problem);
throw new DistributedSystemDisconnectedException("Channel closed", problem);
}
} // send individually
} // !useMcast
// The contract is that every destination enumerated in the
// message should have received the message. If one left
// (i.e., left the view), we signal it here.
if (allDestinations)
return null;
result = new HashSet();
Vector newView = getView();
for (int i = 0; i < destinations.length; i ++) {
InternalDistributedMember d = destinations[i];
if (!newView.contains(d)) {
result.add(d);
}
}
if (result.size() == 0)
return null;
return result;
}
/**
* This is the constructor to use to create a JGroups message holding a GemFire
* DistributionMessage. It sets the appropriate flags in the Message and properly
* serializes the DistributionMessage for the recipient's product version
*
* @param gfmsg the DistributionMessage
* @param src the sender address
* @param version the version of the recipient
* @return the new message
*/
private Message createJGMessage(DistributionMessage gfmsg, Address src, short version) {
if(gfmsg instanceof DirectReplyMessage) {
((DirectReplyMessage) gfmsg).registerProcessor();
}
Message msg = new Message();
msg.setDest(null);
msg.setSrc(src);
msg.setVersion(version);
//log.info("Creating message with payload " + gfmsg);
if (gfmsg instanceof com.gemstone.gemfire.internal.cache.DistributedCacheOperation.CacheOperationMessage) {
com.gemstone.gemfire.internal.cache.DistributedCacheOperation.CacheOperationMessage cmsg =
(com.gemstone.gemfire.internal.cache.DistributedCacheOperation.CacheOperationMessage)gfmsg;
msg.isCacheOperation = true;
if (cmsg.getProcessorId() == 0 && cmsg.getMulticast()) {
msg.bundleable = true;
}
}
msg.isHighPriority = (gfmsg.getProcessorType() == DistributionManager.HIGH_PRIORITY_EXECUTOR
|| gfmsg instanceof HighPriorityDistributionMessage);
if (msg.isHighPriority) {
msg.bundleable = false;
}
try {
HeapDataOutputStream out_stream =
new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version)); // GemStoneAddition
DataSerializer.writeObject(gfmsg, out_stream); // GemStoneAddition
msg.setBuffer(out_stream.toByteArray());
}
catch(IOException ex) {
// GemStoneAddition - we need the cause to figure out what went wrong
IllegalArgumentException ia = new
IllegalArgumentException("Error serializing message");
ia.initCause(ex);
throw ia;
//throw new IllegalArgumentException(ex.toString());
}
return msg;
}
/**
* @throws ConnectionException if the conduit has stopped
*/
public void reset() throws DistributionException
{
if (conduit != null) {
try {
conduit.restart();
} catch (ConnectionException e) {
throw new DistributionException(LocalizedStrings.JGroupMembershipManager_UNABLE_TO_RESTART_CONDUIT.toLocalizedString(), e);
}
}
}
// MembershipManager method
@Override
public void forceUDPMessagingForCurrentThread() {
forceUseJGroups.set(Boolean.TRUE);
}
// MembershipManager method
@Override
public void releaseUDPMessagingForCurrentThread() {
forceUseJGroups.set(null);
}
private boolean isForceUDPCommunications() {
Boolean forced = forceUseJGroups.get();
return forced == Boolean.TRUE;
}
/**
* Establish a sleep period to be instituted after the JChannel finishes connecting.
* This is used by the channel itself when it detects a concurrent startup
* situation and has elected this process as the group coordinator. This allows
* other processes to merge into this process's view (or vice versa) before
* cache processing commences
*/
public void establishChannelPause(long period) {
channelPause = period;
}
/**
* Get or create stub for given member
*/
public Stub getStubForMember(InternalDistributedMember m)
{
if (shutdownInProgress) {
throw new DistributedSystemDisconnectedException(LocalizedStrings.JGroupMembershipManager_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), this.shutdownCause);
}
// Bogus stub object if direct channels not being used
if (conduit == null)
return new Stub(m.getIpAddress(), m.getPort(), m.getVmViewId());
// Return existing one if it is already in place
Stub result;
result = (Stub)memberToStubMap.get(m);
if (result != null)
return result;
synchronized (latestViewLock) {
// Do all of this work in a critical region to prevent
// members from slipping in during shutdown
if (shutdownInProgress())
return null; // don't try to create a stub during shutdown
if (isShunned(m))
return null; // don't let zombies come back to life
// OK, create one. Update the table to reflect the creation.
result = directChannel.createConduitStub(m);
addChannel(m, result);
}
return result;
}
public InternalDistributedMember getMemberForStub(Stub s, boolean validated)
{
synchronized (latestViewLock) {
if (shutdownInProgress) {
throw new DistributedSystemDisconnectedException(LocalizedStrings.JGroupMembershipManager_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), this.shutdownCause);
}
InternalDistributedMember result = (InternalDistributedMember)
stubToMemberMap.get(s);
if (result != null) {
if (validated && !this.latestView.contains(result)) {
// Do not return this member unless it is in the current view.
if (!surpriseMembers.containsKey(result)) {
// if not a surprise member, this stub is lingering and should be removed
stubToMemberMap.remove(s);
memberToStubMap.remove(result);
}
result = null;
// fall through to see if there is a newer member using the same direct port
}
}
if (result == null) {
// it may have not been added to the stub->idm map yet, so check the current view
for (Iterator it=this.latestView.iterator(); it.hasNext(); ) {
InternalDistributedMember idm = (InternalDistributedMember)it.next();
if (idm.getIpAddress().equals(s.getInetAddress())
&& idm.getDirectChannelPort() == s.getPort()) {
addChannel(idm, s);
return idm;
}
}
}
return result;
}
}
public void setShutdown()
{
// shutdown state needs to be set atomically between this
// class and the direct channel. Don't allow new members
// to slip in.
synchronized (latestViewLock) {
shutdownInProgress = true;
}
}
public boolean shutdownInProgress() {
// Impossible condition (bug36329): make sure that we check DM's
// view of shutdown here
return shutdownInProgress || listener.getDM().shutdownInProgress();
}
/**
* Add a mapping from the given member to the given stub. Must be
* synchronized on {@link #latestViewLock} by caller.
*
* @param member
* @param theChannel
*/
protected void addChannel(InternalDistributedMember member, Stub theChannel)
{
if (theChannel != null) {
// Don't overwrite existing stub information with a null
this.memberToStubMap.put(member, theChannel);
// Can't create reverse mapping if the stub is null
this.stubToMemberMap.put(theChannel, member);
}
}
/**
* Attempt to ensure that the jgroups unicast channel has no outstanding unack'd messages.
*/
public void flushUnicast() {
if (ucastProtocol != null) {
long flushStart = 0;
if (DistributionStats.enableClockStats)
flushStart = stats.startUcastFlush();
try {
suspendSends();
long start = System.currentTimeMillis();
while (ucastProtocol.getNumberOfUnackedMessages() > 0) {
try {
Thread.sleep(3);
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
// don't wait longer than the member timeout
if (System.currentTimeMillis() - start > dconfig.getMemberTimeout()) {
break;
}
}
}
finally {
if (DistributionStats.enableClockStats)
stats.endUcastFlush(flushStart);
resumeSends();
}
}
}
/**
* Suspend outgoing messaging
*/
private void suspendSends() {
synchronized(sendSuspendMutex) {
sendSuspended = true;
}
}
/**
* Resume outgoing messaging
*/
private void resumeSends() {
synchronized(sendSuspendMutex) {
sendSuspended = false;
sendSuspendMutex.notifyAll();
}
}
/**
* Clean up and create consistent new view with member removed.
* No uplevel events are generated.
*
* Must be called with the {@link #latestViewLock} held.
*/
protected void destroyMember(final InternalDistributedMember member,
boolean crashed, final String reason) {
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS))
logger.trace(LogMarker.DISTRIBUTION_VIEWS, "Membership: destroying < {} >", member);
// Clean up the maps
Stub theChannel = (Stub)memberToStubMap.remove(member);
if (theChannel != null) {
this.stubToMemberMap.remove(theChannel);
}
this.ipAddrToMemberMap.remove(
new IpAddress(member.getIpAddress(), member.getPort()));
// Make sure it is removed from the view
synchronized (this.latestViewLock) {
if (latestView.contains(member)) {
NetView newView = new NetView(latestView, latestView.getViewNumber());
newView.remove(member);
latestView = newView;
}
}
surpriseMembers.remove(member);
// Trickiness: there is a minor recursion
// with addShunnedMembers, since it will
// attempt to destroy really really old members. Performing the check
// here breaks the recursion.
if (!isShunned(member)) {
addShunnedMember(member);
}
final DirectChannel dc = directChannel;
if (dc != null) {
// if (crashed) {
// dc.closeEndpoint(member, reason);
// }
// else
// Bug 37944: make sure this is always done in a separate thread,
// so that shutdown conditions don't wedge the view lock
{ // fix for bug 34010
Thread t = new Thread() {
@Override
public void run() {
try {
Thread.sleep(
Integer.getInteger("p2p.disconnectDelay", 3000).intValue());
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
// Keep going, try to close the endpoint.
}
if (logger.isDebugEnabled())
logger.debug("Membership: closing connections for departed member {}", member);
// close connections, but don't do membership notification since it's already been done
dc.closeEndpoint(member, reason, false);
}
};
t.setDaemon(true);
t.setName("disconnect thread for " + member);
t.start();
} // fix for bug 34010
}
}
public Stub getDirectChannel()
{
synchronized (latestViewLock) {
return (Stub)memberToStubMap.get(myMemberId);
}
}
/** replace strings in a jgroups properties file. This is used during channel creation to
insert parameters from gemfire.properties into the jgroups stack configuration */
private static String replaceStrings(String properties, String property, String value)
{
StringBuffer sb = new StringBuffer();
int start = 0;
int index = properties.indexOf(property);
while (index != -1) {
sb.append(properties.substring(start, index));
sb.append(value);
start = index + property.length();
index = properties.indexOf(property, start);
}
sb.append(properties.substring(start));
return sb.toString();
}
/**
* Indicate whether the given member is in the zombie list (dead or dying)
* @param m the member in question
*
* This also checks the time the given member was shunned, and
* has the side effect of removing the member from the
* list if it was shunned too far in the past.
*
* @guarded.By latestViewLock
* @return true if the given member is a zombie
*/
protected boolean isShunned(DistributedMember m) {
synchronized (latestViewLock) {
if (!shunnedMembers.containsKey(m))
return false;
// Make sure that the entry isn't stale...
long shunTime = ((Long)shunnedMembers.get(m)).longValue();
long now = System.currentTimeMillis();
if (shunTime + SHUNNED_SUNSET * 1000 > now)
return true;
// Oh, it _is_ stale. Remove it while we're here.
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
logger.debug("Membership: no longer shunning < {} >", m);
}
endShun(m);
return false;
} // sync
}
// public boolean isShunnedMember(IpAddress addr) {
// synchronized(latestViewLock) {
// InternalDistributedMember idm = getMemberFromIpAddress(addr, false);
// if (idm == null) {
// return false; // don't know about this member, so don't shun it
// }
// return isShunned(idm);
// }
// }
public boolean isShunnedMemberNoSync(IpAddress addr) {
InternalDistributedMember mbr = (InternalDistributedMember)ipAddrToMemberMap.get(addr);
if (mbr == null) {
return false;
}
return shunnedMembers.containsKey(mbr);
}
/**
* Indicate whether the given member is in the surprise member list
* <P>
* Unlike isShunned, this method will not cause expiry of a surprise member.
* That must be done during view processing.
* <p>
* Like isShunned, this method holds the view lock while executing
*
* @guarded.By latestViewLock
* @param m the member in question
* @return true if the given member is a surprise member
*/
public boolean isSurpriseMember(DistributedMember m) {
synchronized (latestViewLock) {
if (surpriseMembers.containsKey(m)) {
long birthTime = ((Long)surpriseMembers.get(m)).longValue();
long now = System.currentTimeMillis();
return (birthTime >= (now - this.surpriseMemberTimeout));
}
return false;
} // sync
}
/**
* for testing we need to be able to inject surprise members into
* the view to ensure that sunsetting works properly
* @param m the member ID to add
* @param birthTime the millisecond clock time that the member was first seen
*/
protected void addSurpriseMemberForTesting(DistributedMember m, long birthTime) {
if (logger.isDebugEnabled()) {
logger.debug("test hook is adding surprise member {} birthTime={}", m, birthTime);
}
synchronized(latestViewLock) {
surpriseMembers.put((InternalDistributedMember)m, Long.valueOf(birthTime));
}
}
/**
* returns the surpriseMemberTimeout interval, in milliseconds
*/
public int getSurpriseMemberTimeout() {
return this.surpriseMemberTimeout;
}
/**
* returns the shunned member shunset interval, in milliseconds
*/
public int getShunnedMemberTimeout() {
return SHUNNED_SUNSET * 1000;
}
private boolean endShun(DistributedMember m) {
boolean wasShunned = (shunnedMembers.remove(m) != null);
shunnedAndWarnedMembers.remove(m);
return wasShunned;
}
/**
* Add the given member to the shunned list. Also, purge any shunned
* members that are really really old.
* <p>
* Must be called with {@link #latestViewLock} held and
* the view stable.
*
* @param m the member to add
*/
protected void addShunnedMember(InternalDistributedMember m) {
long deathTime = System.currentTimeMillis() - SHUNNED_SUNSET * 1000;
surpriseMembers.remove(m); // for safety
// Update the shunned set.
if (!isShunned(m)) {
shunnedMembers.put(m, Long.valueOf(System.currentTimeMillis()));
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS))
logger.trace(LogMarker.DISTRIBUTION_VIEWS, "Membership: added shunned member < {} >", m);
}
// Remove really really old shunned members.
// First, make a copy of the old set. New arrivals _a priori_ don't matter,
// and we're going to be updating the list so we don't want to disturb
// the iterator.
Set oldMembers = new HashSet(shunnedMembers.entrySet());
HashSet removedMembers = new HashSet();
Iterator it = oldMembers.iterator();
while (it.hasNext()) {
Map.Entry e = (Map.Entry)it.next();
// Key is the member. Value is the time to remove it.
long ll = ((Long)e.getValue()).longValue();
if (ll >= deathTime)
continue; // too new.
InternalDistributedMember mm = (InternalDistributedMember)e.getKey();
if (latestView.contains(mm)) {
// Fault tolerance: a shunned member can conceivably linger but never
// disconnect.
//
// We may not delete it at the time that we shun it because the view
// isn't necessarily stable. (Note that a well-behaved cache member
// will depart on its own accord, but we force the issue here.)
destroyMember(mm, true, "shunned but never disconnected");
}
if (logger.isDebugEnabled())
logger.debug("Membership: finally removed shunned member entry <{}>", mm);
removedMembers.add(mm);
} // while
// Now remove these folk from the list...
synchronized (latestViewLock) {
it = removedMembers.iterator();
while (it.hasNext()) {
InternalDistributedMember idm = (InternalDistributedMember)it.next();
endShun(idm);
ipAddrToMemberMap.remove(new IpAddress(idm.getIpAddress(), idm.getPort()));
}
}
}
/**
* Retrieve thread-local data for transport to another thread in hydra
*/
public Object getThreadLocalData() {
Map result = new HashMap();
// if (dirackProtocol != null) {
// Object td = dirackProtocol.getThreadDataIfPresent();
// if (td != null) {
// result.put("DirAck", td);
// }
// }
return result;
}
/**
* Set thread-local data for hydra
*/
public void setThreadLocalData(Object data) {
Map dataMap = (Map)data;
Object td = dataMap.get("DirAck");
// if (td != null && dirackProtocol != null) {
// dirackProtocol.setThreadData(td);
// }
}
/**
* for testing verification purposes, this return the port for the
* direct channel, or zero if there is no direct
* channel
*/
public int getDirectChannelPort() {
return directChannel == null? 0 : directChannel.getPort();
}
public int getSerialQueueThrottleTime(Address sender) {
IpAddress senderAddr = (IpAddress) sender;
//if (!serialQueueInitialized) { // no need to synchronize - queue is invariant
// serialQueueInitialized = true;
InternalDistributedMember member = getMemberFromIpAddress(senderAddr, true);
ThrottlingMemLinkedQueueWithDMStats serialQueue = listener.getDM()
.getSerialQueue(member);
//}
// return serialQueue != null && serialQueue.wouldBlock();
if (serialQueue == null)
return 0;
return serialQueue.getThrottleTime();
}
/**
* return an InternalDistributedMember representing the given jgroups address
* @param sender
* @param createIfAbsent
* @return the IDM for the given jgroups address
*/
public InternalDistributedMember getMemberFromIpAddress(IpAddress sender,
boolean createIfAbsent) {
synchronized(latestViewLock) {
InternalDistributedMember mbr = (InternalDistributedMember)ipAddrToMemberMap.get(sender);
if (mbr == null && createIfAbsent) {
JGroupMember jgm = new JGroupMember(sender);
mbr = new InternalDistributedMember(jgm);
// if a fully formed address, retain it in the map for future use
if (sender.getVmKind() != 0) {
ipAddrToMemberMap.put(sender, mbr);
}
}
return mbr;
}
}
/* non-thread-owned serial channels and high priority channels are not
* included
*/
public HashMap getChannelStates(DistributedMember member, boolean includeMulticast) {
HashMap result = new HashMap();
Stub stub = (Stub)memberToStubMap.get(member);
DirectChannel dc = directChannel;
if (stub != null && dc != null) {
dc.getChannelStates(stub, result);
}
if (includeMulticast) {
result.put("JGroups.MCast", Long.valueOf(channel.getMulticastState()));
}
return result;
}
public void waitForChannelState(DistributedMember otherMember, HashMap channelState)
throws InterruptedException
{
if (Thread.interrupted()) throw new InterruptedException();
DirectChannel dc = directChannel;
Long mcastState = (Long)channelState.remove("JGroups.MCast");
Stub stub;
synchronized (latestViewLock) {
stub = (Stub)memberToStubMap.get(otherMember);
}
if (dc != null && stub != null) {
dc.waitForChannelState(stub, channelState);
}
if (mcastState != null) {
InternalDistributedMember idm = (InternalDistributedMember)otherMember;
JGroupMember jgm = (JGroupMember)idm.getNetMember();
Address other = jgm.getAddress();
channel.waitForMulticastState(other, mcastState.longValue());
}
}
/*
* (non-Javadoc)
* MembershipManager method: wait for the given member to be gone. Throws TimeoutException if
* the wait goes too long
* @see com.gemstone.gemfire.distributed.internal.membership.MembershipManager#waitForDeparture(com.gemstone.gemfire.distributed.DistributedMember)
*/
public boolean waitForDeparture(DistributedMember mbr) throws TimeoutException, InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
boolean result = false;
DirectChannel dc = directChannel;
InternalDistributedMember idm = (InternalDistributedMember)mbr;
Stub stub = new Stub(idm.getIpAddress(), idm.getPort(), idm.getVmViewId());
int memberTimeout = this.dconfig.getMemberTimeout();
long pauseTime = (memberTimeout < 1000) ? 100 : memberTimeout / 10;
boolean wait;
int numWaits = 0;
do {
wait = false;
if (dc != null) {
if (dc.hasReceiversFor(stub)) {
wait = true;
}
if (wait && logger.isDebugEnabled()) {
logger.info("waiting for receivers for {} to shut down", mbr);
}
}
if (!wait) {
synchronized(latestViewLock) {
wait = this.latestView.contains(idm);
}
if (wait && logger.isDebugEnabled()) {
logger.debug("waiting for {} to leave the membership view", mbr);
}
}
if (!wait) {
// run a message through the member's serial execution queue to ensure that all of its
// current messages have been processed
ThrottlingMemLinkedQueueWithDMStats serialQueue = listener.getDM().getSerialQueue(idm);
if (serialQueue != null) {
final boolean done[] = new boolean[1];
final FlushingMessage msg = new FlushingMessage(done);
serialQueue.add(new SizeableRunnable(100) {
public void run() {
msg.invoke();
}
public String toString() {
return "Processing fake message";
}
});
synchronized(done) {
while (done[0] == false) {
done.wait(10);
}
result = true;
}
}
}
if (wait) {
numWaits++;
if (numWaits > 40) {
// waited over 4 * memberTimeout ms. Give up at this point
throw new TimeoutException("waited too long for " + idm + " to be removed");
}
Thread.sleep(pauseTime);
}
} while (wait && (this.channel.isOpen() || (dc != null && dc.isOpen())) );
if (logger.isDebugEnabled()) {
logger.debug("operations for {} should all be in the cache at this point", mbr);
}
return result;
}
/** return the distribution config used to instantiate this membership manager */
public DistributionConfig getDistributionConfig() {
return this.dconfig;
}
/**
* check to see if the member is shunned
* @param mbr the JGroups address of the member
* @return true if the address has been removed from membership
*/
public boolean memberExists(IpAddress mbr) {
synchronized(latestViewLock) {
InternalDistributedMember idm = getMemberFromIpAddress(mbr, true);
if (idm == null) {
return true; // TODO I don't think this happens
}
return memberExists(idm);
}
}
public void warnShun(IpAddress mbr) {
InternalDistributedMember idm;
synchronized(latestViewLock) {
idm = getMemberFromIpAddress(mbr, true);
}
if (idm == null) {
return;
}
warnShun(idm);
}
public boolean waitForMembershipCheck(InternalDistributedMember remoteId) {
boolean foundRemoteId = false;
Latch currentLatch = null;
// ARB: preconditions
// remoteId != null
synchronized (latestViewLock) {
if (latestView == null) {
// Not sure how this would happen, but see bug 38460.
// No view?? Not found!
}
else if (latestView.contains(remoteId)) {
// ARB: check if remoteId is already in membership view.
// If not, then create a latch if needed and wait for the latch to open.
foundRemoteId = true;
}
else if ((currentLatch = (Latch)this.memberLatch.get(remoteId)) == null) {
currentLatch = new Latch();
this.memberLatch.put(remoteId, currentLatch);
}
} // synchronized
if (!foundRemoteId) {
// ARB: wait for hardcoded 1000 ms for latch to open.
// if-stmt precondition: currentLatch is non-null
try {
if (currentLatch.attempt(membershipCheckTimeout)) {
foundRemoteId = true;
// @todo
// ARB: remove latch from memberLatch map if this is the last thread waiting on latch.
}
}
catch (InterruptedException ex) {
// ARB: latch attempt was interrupted.
Thread.currentThread().interrupt();
logger.warn(LocalizedMessage.create(
LocalizedStrings.JGroupMembershipManager_THE_MEMBERSHIP_CHECK_WAS_TERMINATED_WITH_AN_EXCEPTION));
}
}
// ARB: postconditions
// (foundRemoteId == true) ==> (currentLatch is non-null ==> currentLatch is open)
return foundRemoteId;
}
/* returns the cause of shutdown, if known */
public Throwable getShutdownCause() {
return this.shutdownCause;
}
public void registerTestHook(JChannelTestHook mth) {
this.channelTestHook = mth;
}
public void unregisterTestHook(JChannelTestHook mth) {
this.channelTestHook = null;
}
public void registerTestHook(MembershipTestHook mth) {
// synchronize additions to avoid races during startup
synchronized(this.latestViewLock) {
if (this.membershipTestHooks == null) {
this.membershipTestHooks = Collections.singletonList(mth);
}
else {
List l = new ArrayList(this.membershipTestHooks);
l.add(mth);
this.membershipTestHooks = l;
}
}
}
public void unregisterTestHook(MembershipTestHook mth) {
synchronized(this.latestViewLock) {
if (this.membershipTestHooks != null) {
if (this.membershipTestHooks.size() == 1) {
this.membershipTestHooks = null;
}
else {
List l = new ArrayList(this.membershipTestHooks);
l.remove(mth);
}
}
}
}
boolean beingSick;
boolean playingDead;
/**
* Test hook - be a sick member
*/
protected synchronized void beSick() {
if (!beingSick) {
beingSick = true;
if (logger.isDebugEnabled()) {
logger.debug("JGroupMembershipManager.beSick invoked for {} - simulating sickness", this.myMemberId);
}
fdProtocol.beSick();
// close current connections and stop accepting new ones
fdSockProtocol.beSick();
if (directChannel != null) {
directChannel.beSick();
}
}
}
/**
* Test hook - don't answer "are you alive" requests
*/
protected synchronized void playDead() {
if (!playingDead) {
playingDead = true;
if (logger.isDebugEnabled()) {
logger.debug("JGroupMembershipManager.playDead invoked for {}", this.myMemberId);
}
// close current connections and stop accepting new ones
verifySuspectProtocol.playDead(true);
fdProtocol.beSick();
fdSockProtocol.beSick();
}
}
/**
* Test hook - recover health
*/
protected synchronized void beHealthy() {
if (beingSick || playingDead) {
beingSick = false;
playingDead = false;
if (logger.isDebugEnabled()) {
logger.debug("JGroupMembershipManager.beHealthy invoked for {} - recovering health now", this.myMemberId);
}
fdSockProtocol.beHealthy();
fdProtocol.beHealthy();
if (directChannel != null) {
directChannel.beHealthy();
}
verifySuspectProtocol.playDead(false);
}
}
/**
* Test hook
*/
public boolean isBeingSick() {
return this.beingSick;
}
/**
* Test hook - inhibit ForcedDisconnectException logging to keep dunit logs clean
* @param b
*/
public static void inhibitForcedDisconnectLogging(boolean b) {
inhibitForceDisconnectLogging = true;
}
/**
* @param uniqueID
*/
public void setUniqueID(int uniqueID) {
MemberAttributes.setDefaultVmPid(uniqueID);
}
/**
* @return the nakack protocol if it exists
*/
public NAKACK getNakAck() {
return this.nakAckProtocol;
}
/** this is a fake message class that is used to flush the serial execution queue */
static class FlushingMessage extends DistributionMessage {
boolean[] done;
FlushingMessage(boolean[] done) {
this.done = done;
}
public void invoke() {
synchronized(done) {
done[0] = true;
done.notify();
}
}
protected void process(DistributionManager dm) {
// not used
}
public int getDSFID() {
return 0;
}
public int getProcessorType() {
return DistributionManager.SERIAL_EXECUTOR;
}
}
/**
* Sets cache time offset in {@link DistributionManager}.
*
* @param src
* @param timeOffset
* @see InternalDistributedSystem#getClock()
* @see DSClock#setCacheTimeOffset(DistributedMember, long, boolean)
*/
public void setCacheTimeOffset(Address src, long timeOffset, boolean isJoin) {
// check if offset calculator is still in view
InternalDistributedMember coord = (InternalDistributedMember) ipAddrToMemberMap
.get(src);
if (coord == null && src != null) {
JGroupMember jgm = new JGroupMember((IpAddress)src);
coord = new InternalDistributedMember(jgm);
}
if (this.listener != null) {
DistributionManager dm = this.listener.getDM();
dm.getSystem().getClock().setCacheTimeOffset(coord, timeOffset, isJoin);
}
}
/**
* returns the general purpose membership timer
*/
public Timer getTimer() {
return this.timer;
}
}