blob: 52e63276a806cd8579b3ca4dd13ecab3436fc336 [file] [log] [blame]
/** Notice of modification as required by the LGPL
* This file was modified by Gemstone Systems Inc. on
* $Date$
**/
// $Id: JChannel.java,v 1.44 2005/11/08 13:57:08 belaban Exp $
package com.gemstone.org.jgroups;
import com.gemstone.org.jgroups.util.GemFireTracer;
import com.gemstone.org.jgroups.util.ExternalStrings;
//import org.apache.commons.logging.LogFactory;
import org.w3c.dom.Element;
import com.gemstone.org.jgroups.conf.ConfiguratorFactory;
import com.gemstone.org.jgroups.conf.ProtocolStackConfigurator;
import com.gemstone.org.jgroups.protocols.pbcast.NAKACK;
import com.gemstone.org.jgroups.spi.GFBasicAdapter;
import com.gemstone.org.jgroups.spi.GFPeerAdapter;
import com.gemstone.org.jgroups.stack.GFBasicAdapterImpl;
import com.gemstone.org.jgroups.stack.ProtocolStack;
import com.gemstone.org.jgroups.stack.StateTransferInfo;
import com.gemstone.org.jgroups.util.*;
import java.io.File;
import java.io.Serializable;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* JChannel is a pure Java implementation of Channel.
* When a JChannel object is instantiated it automatically sets up the
* protocol stack.
* <p>
* <B>Properties</B>
* <P>
* Properties are used to configure a channel, and are accepted in
* several forms; the String form is described here.
* A property string consists of a number of properties separated by
* colons. For example:
* <p>
* <pre>"&lt;prop1&gt;(arg1=val1):&lt;prop2&gt;(arg1=val1;arg2=val2):&lt;prop3&gt;:&lt;propn&gt;"</pre>
* <p>
* Each property relates directly to a protocol layer, which is
* implemented as a Java class. When a protocol stack is to be created
* based on the above property string, the first property becomes the
* bottom-most layer, the second one will be placed on the first, etc.:
* the stack is created from the bottom to the top, as the string is
* parsed from left to right. Each property has to be the name of a
* Java class that resides in the
* <code>com.gemstone.org.jgroups.protocols}</code> package.
* <p>
* Note that only the base name has to be given, not the fully specified
* class name (e.g., UDP instead of com.gemstone.org.jgroups.protocols.UDP).
* <p>
* Each layer may have 0 or more arguments, which are specified as a
* list of name/value pairs in parentheses directly after the property.
* In the example above, the first protocol layer has 1 argument,
* the second 2, the third none. When a layer is created, these
* properties (if there are any) will be set in a layer by invoking
* the layer's setProperties() method
* <p>
* As an example the property string below instructs JGroups to create
* a JChannel with protocols UDP, PING, FD and GMS:<p>
* <pre>"UDP(mcast_addr=228.10.9.8;mcast_port=5678):PING:FD:GMS"</pre>
* <p>
* The UDP protocol layer is at the bottom of the stack, and it
* should use mcast address 228.10.9.8. and port 5678 rather than
* the default IP multicast address and port. The only other argument
* instructs FD to output debug information while executing.
* Property UDP refers to a JGroups class ,
* that is subsequently loaded and an instance of which is created as protocol layer.
* If any of these classes are not found, an exception will be thrown and
* the construction of the stack will be aborted.
*
* @see com.gemstone.org.jgroups.protocols.UDP
* @author Bela Ban
* @author Filip Hanik
* @version $Revision: 1.44 $
*/
public class JChannel extends Channel {
/**
* The default protocol stack used by the default constructor.
*/
public static final String DEFAULT_PROTOCOL_STACK=
"UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32):" +
"PING(timeout=3000;num_initial_members=6):" +
"FD(timeout=3000):" +
"VERIFY_SUSPECT(timeout=1500):" +
"pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800):" +
"UNICAST(timeout=600,1200,2400,4800):" +
"pbcast.STABLE(desired_avg_gossip=10000):" +
"FRAG:" +
"pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
"shun=true;print_local_addr=true)";
static final String FORCE_PROPS="force.properties";
/* the protocol stack configuration string */
private String props=null;
/*the address of this JChannel instance*/
protected/*GemStoneAddition*/ Address local_addr=null;
/*the channel (also know as group) name*/
protected/*GemStoneAddition*/ String channel_name=null; // group name
/*the latest view of the group membership*/
private View my_view=null;
/*the queue that is used to receive messages (events) from the protocol stack*/
protected/*GemStoneAddition*/ final Queue mq=new Queue();
/*the protocol stack, used to send and receive messages from the protocol stack*/
protected ProtocolStack prot_stack=null;
/** Thread responsible for closing a channel and potentially reconnecting to it (e.g., when shunned).
*
* GuardedBy this
*/
protected CloserThread closer=null;
/** To wait until a local address has been assigned */
private final Promise local_addr_promise=new Promise();
/** To wait until we have connected successfully */
protected/*GemStoneAddition*/ final Promise connect_promise=new Promise();
// GemStoneAddition - fix for bug 39859 - hang in JChannel.disconnect()
// /** To wait until we have been disconnected from the channel */
// private final Promise disconnect_promise=new Promise();
private final Promise state_promise=new Promise();
private final Object suspend_mutex=new Object();
private boolean suspended=false;
/** wait until we have a non-null local_addr */
private long LOCAL_ADDR_TIMEOUT=30000; //=Long.parseLong(System.getProperty("local_addr.timeout", "30000"));
/*if the states is fetched automatically, this is the default timeout, 5 secs*/
private static final long GET_STATE_DEFAULT_TIMEOUT=5000;
/*flag to indicate whether to receive views from the protocol stack*/
private boolean receive_views=true;
/*flag to indicate whether to receive suspect messages*/
private boolean receive_suspects=true;
/*flag to indicate whether to receive blocks, if this is set to true, receive_views is set to true*/
private boolean receive_blocks=false;
/*flag to indicate whether to receive local messages
*if this is set to false, the JChannel will not receive messages sent by itself*/
private boolean receive_local_msgs=true;
/*flag to indicate whether to receive a state message or not*/
private boolean receive_get_states=false;
/*flag to indicate whether the channel will reconnect (reopen) when the exit message is received*/
protected/*GemStoneAddition*/ boolean auto_reconnect=false;
/*flag t indicate whether the state is supposed to be retrieved after the channel is reconnected
*setting this to true, automatically forces auto_reconnect to true*/
protected/*GemStoneAddition*/ boolean auto_getstate=false;
/*channel connected flag*/
protected/*GemStoneAddition*/ volatile/*GemStoneAddition*/ boolean connected=false;
/** block send()/down() if true (unlocked by UNBLOCK_SEND event) */
private final CondVar block_sending=new CondVar("block_sending", Boolean.FALSE);
/*channel closed flag. GemStoneAddition: volatile*/
private volatile boolean closed=false; // close() has been called, channel is unusable
/** GemStoneAddition - are we closing the channel? */
private volatile boolean closing = false;
/** GemStoneAddition - exception in closing channel */
protected volatile RuntimeException closeException = null;
/** True if a state transfer protocol is available, false otherwise */
private boolean state_transfer_supported=false; // set by CONFIG event from STATE_TRANSFER protocol
/** Used to maintain additional data across channel disconnects/reconnects. This is a kludge and will be remove
* as soon as JGroups supports logical addresses
*/
protected/*GemStoneAddition*/ byte[] additional_data=null;
protected final GemFireTracer log=GemFireTracer.getLog(getClass());
/** Collect statistics */
protected boolean stats=true;
protected long sent_msgs=0, received_msgs=0, sent_bytes=0, received_bytes=0;
public Event exitEvent;
/**
* default serialization and exception functions, used if a
* JGroups Channel hasn't been created
*/
private static GFBasicAdapter gfFunctions = new GFBasicAdapterImpl();
/** serialization and exceptions adapter for GemFire */
public static GFBasicAdapter getGfFunctions() {
return gfFunctions;
}
/**
* Establishes non-default serialization and exception functions.
*/
public static void setDefaultGFFunctions(GFBasicAdapter functions) {
gfFunctions = functions;
}
/**
* Constructs a <code>JChannel</code> instance with the protocol stack
* specified by the <code>DEFAULT_PROTOCOL_STACK</code> member.
*
* @throws ChannelException if problems occur during the initialization of
* the protocol stack.
*/
public JChannel() throws ChannelException {
//this(DEFAULT_PROTOCOL_STACK);
// GemStoneAddition - this is stubbed out for testing purposes
}
/**
* Constructs a <code>JChannel</code> instance with the protocol stack
* configuration contained by the specified file.
*
* @param properties a file containing a JGroups XML protocol stack
* configuration.
*
* @throws ChannelException if problems occur during the configuration or
* initialization of the protocol stack.
*/
public JChannel(File properties) throws ChannelException {
this(ConfiguratorFactory.getStackConfigurator(properties));
}
/**
* Constructs a <code>JChannel</code> instance with the protocol stack
* configuration contained by the specified XML element.
*
* @param properties a XML element containing a JGroups XML protocol stack
* configuration.
*
* @throws ChannelException if problems occur during the configuration or
* initialization of the protocol stack.
*/
public JChannel(Element properties) throws ChannelException {
this(ConfiguratorFactory.getStackConfigurator(properties));
}
/**
* Constructs a <code>JChannel</code> instance with the protocol stack
* configuration indicated by the specified URL.
*
* @param properties a URL pointing to a JGroups XML protocol stack
* configuration.
*
* @throws ChannelException if problems occur during the configuration or
* initialization of the protocol stack.
*/
public JChannel(URL properties) throws ChannelException {
this(ConfiguratorFactory.getStackConfigurator(properties));
}
/**
* Constructs a <code>JChannel</code> instance with the protocol stack
* configuration based upon the specified properties parameter.
*
* @param properties an old style property string, a string representing a
* system resource containing a JGroups XML configuration,
* a string representing a URL pointing to a JGroups XML
* XML configuration, or a string representing a file name
* that contains a JGroups XML configuration.
*
* @throws ChannelException if problems occur during the configuration and
* initialization of the protocol stack.
*/
public JChannel(String properties) throws ChannelException {
this(ConfiguratorFactory.getStackConfigurator(properties));
}
/**
* Constructs a <code>JChannel</code> instance with the protocol stack
* configuration contained by the protocol stack configurator parameter.
* <p>
* All of the public constructors of this class eventually delegate to this
* method.
*
* @param configurator a protocol stack configurator containing a JGroups
* protocol stack configuration.
*
* @throws ChannelException if problems occur during the initialization of
* the protocol stack.
*/
protected JChannel(ProtocolStackConfigurator configurator) throws ChannelException {
props = configurator.getProtocolStackString();
if (this.log.isDebugEnabled()) {
this.log.debug("Configuring JGroups stack with '" + props + "'");
}
/*create the new protocol stack*/
prot_stack=new ProtocolStack(this, props);
/* Setup protocol stack (create layers, queues between them */
try {
prot_stack.setup();
} catch (Exception e) {
throw new ChannelException("unable to setup the protocol stack", e);
}
}
/**
* Creates a new JChannel with the protocol stack as defined in the properties
* parameter. an example of this parameter is<BR>
* "UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE"<BR>
* Other examples can be found in the ./conf directory<BR>
* @param properties the protocol stack setup; if null, the default protocol stack will be used.
* The properties can also be a java.net.URL object or a string that is a URL spec.
* The JChannel will validate any URL object and String object to see if they are a URL.
* In case of the parameter being a url, the JChannel will try to load the xml from there.
* In case properties is a org.w3c.dom.Element, the ConfiguratorFactory will parse the
* DOM tree with the element as its root element.
* @deprecated Use the constructors with specific parameter types instead.
*/
@Deprecated // GemStoneAddition
public JChannel(Object properties) throws ChannelException {
if (properties == null) {
properties = DEFAULT_PROTOCOL_STACK;
}
try {
ProtocolStackConfigurator c=ConfiguratorFactory.getStackConfigurator(properties);
props=c.getProtocolStackString();
}
catch(Exception x) {
throw new ChannelException("unable to load protocol stack", x);
}
/*create the new protocol stack*/
prot_stack=new ProtocolStack(this, props);
/* Setup protocol stack (create layers, queues between them */
try {
prot_stack.setup();
} catch (Exception e) {
throw new ChannelException("failed to setup protocol stack", e);
}
}
public JChannel(String properties, GFBasicAdapter jgBasicAdapter, GFPeerAdapter jgPeerAdapter,
boolean enableClockStats, boolean enableJgStackStats) throws ChannelException {
if (properties == null) {
properties = DEFAULT_PROTOCOL_STACK;
}
try {
ProtocolStackConfigurator c=ConfiguratorFactory.getStackConfigurator(properties);
props=c.getProtocolStackString();
}
catch(Exception x) {
throw new ChannelException("unable to load protocol stack", x);
}
/*create the new protocol stack*/
prot_stack=new ProtocolStack(this, props);
prot_stack.gfPeerFunctions = jgPeerAdapter;
prot_stack.gfBasicFunctions = jgBasicAdapter;
JChannel.gfFunctions = jgBasicAdapter; // used primarily for serialization enhancements
prot_stack.enableClockStats = enableClockStats;
prot_stack.enableJgStackStats = enableJgStackStats;
/* Setup protocol stack (create layers, queues between them */
try {
prot_stack.setup();
} catch (Exception e) {
throw new ChannelException("failed to setup protocol stack", e);
}
}
/**
* Returns the protocol stack.
* Currently used by Debugger.
* Specific to JChannel, therefore
* not visible in Channel
*/
public ProtocolStack getProtocolStack() {
return prot_stack;
}
@Override // GemStoneAddition
protected GemFireTracer getLog() {
return log;
}
/**
* returns the protocol stack configuration in string format.
* an example of this property is<BR>
* "UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE"
*/
public String getProperties() {
return props;
}
public boolean statsEnabled() {
return stats;
}
public void enableStats(boolean stats) {
this.stats=stats;
}
public void resetStats() {
sent_msgs=received_msgs=sent_bytes=received_bytes=0;
}
public long getSentMessages() {return sent_msgs;}
public long getSentBytes() {return sent_bytes;}
public long getReceivedMessages() {return received_msgs;}
public long getReceivedBytes() {return received_bytes;}
public int getNumberOfTasksInTimer() {return prot_stack != null ? prot_stack.timer.size() : -1;}
public String dumpTimerQueue() {
return prot_stack != null? prot_stack.dumpTimerQueue() : "<n/a";
}
/**
* Returns a pretty-printed form of all the protocols. If include_properties is set,
* the properties for each protocol will also be printed.
*/
public String printProtocolSpec(boolean include_properties) {
return prot_stack != null ? prot_stack.printProtocolSpec(include_properties) : null;
}
/**
* Connects the channel to a group.
* If the channel is already connected, an error message will be printed to the error log.
* If the channel is closed a ChannelClosed exception will be thrown.
* This method starts the protocol stack by calling ProtocolStack.start,
* then it sends an Event.CONNECT event down the stack and waits to receive a CONNECT_OK event.
* Once the CONNECT_OK event arrives from the protocol stack, any channel listeners are notified
* and the channel is considered connected.
*
* @param channel_name A <code>String</code> denoting the group name. Cannot be null.
* @exception ChannelException The protocol stack cannot be started
* @exception ChannelClosedException The channel is closed and therefore cannot be used any longer.
* A new channel has to be created first.
*/
@Override // GemStoneAddition
// GemStoneAddition - removed synchronization to avoid deadlock with PingSender
// in the event a GossipServer can't be found and an exit must be forced
public void connect(String channel_name) throws ChannelException, ChannelClosedException {
this.exitEvent = null; // GemStoneAddition
/*make sure the channel is not closed*/
checkClosed();
/*if we already are connected, then ignore this*/
if(connected) {
if(log.isErrorEnabled()) log.error(ExternalStrings.JChannel_ALREADY_CONNECTED_TO__0, channel_name);
return;
}
/*make sure we have a valid channel name*/
if(channel_name == null) {
if(log.isInfoEnabled()) log.info(ExternalStrings.JChannel_CHANNEL_NAME_IS_NULL_ASSUMING_UNICAST_CHANNEL);
}
else
this.channel_name=channel_name;
try {
prot_stack.startStack(); // calls start() in all protocols, from top to bottom
} catch (Exception e) {
throw new ChannelException("error connecting distribution channel", e);
}
/* try to get LOCAL_ADDR_TIMEOUT. Catch SecurityException if called in an untrusted environment (e.g. using JNLP) */
try {
LOCAL_ADDR_TIMEOUT=Long.parseLong(System.getProperty("local_addr.timeout","30000"));
}
catch (SecurityException e1) {
/* Use the default value specified above*/
}
/* Wait LOCAL_ADDR_TIMEOUT milliseconds for local_addr to have a non-null value (set by SET_LOCAL_ADDRESS) */
local_addr=(Address)local_addr_promise.getResult(LOCAL_ADDR_TIMEOUT);
if(local_addr == null) {
log.fatal("local_addr is null; cannot connect");
throw new ChannelException("local_addr is null");
}
/*create a temporary view, assume this channel is the only member and
*is the coordinator*/
Vector t=new Vector(1);
t.addElement(local_addr);
my_view=new View(local_addr, 0, t); // create a dummy view
// only connect if we are not a unicast channel
if(channel_name != null) {
connect_promise.reset();
Event connect_event=new Event(Event.CONNECT, channel_name);
down(connect_event);
connect_promise.getResult(); // waits forever until connected (or channel is closed)
}
// GemStoneAddition - connect/close race & exception throwing
if (!closing && !closed) {
/*notify any channel listeners*/
connected=true;
if(channel_listener != null)
channel_listener.channelConnected(this);
} else {
if (closeException != null) {
throw closeException;
} else {
String s = "Failure during connect (closing = " + closing
+ ", closed = " + closed + ")";
throw new ChannelException(s);
}
}
/*notify any channel listeners*/
connected=true;
notifyChannelConnected(this);
}
/**
* Disconnects the channel if it is connected. If the channel is closed, this operation is ignored<BR>
* Otherwise the following actions happen in the listed order<BR>
* <ol>
* <li> The JChannel sends a DISCONNECT event down the protocol stack<BR>
* <li> Blocks until the channel to receives a DISCONNECT_OK event<BR>
* <li> Sends a STOP_QUEING event down the stack<BR>
* <li> Stops the protocol stack by calling ProtocolStack.stop()<BR>
* <li> Notifies the listener, if the listener is available<BR>
* </ol>
*/
@Override // GemStoneAddition
public synchronized void disconnect() {
if(closed) return;
resume();
if(connected) {
if(channel_name != null) {
/* Send down a DISCONNECT event. The DISCONNECT event travels down to the GMS, where a
* DISCONNECT_OK response is generated and sent up the stack. JChannel blocks until a
* DISCONNECT_OK has been received, or until timeout has elapsed.
*/
Event disconnect_event=new Event(Event.DISCONNECT, local_addr);
// GemStoneAddition - the disconnect_promise isn't needed when
// there are no down-threads between JChannel and GMS
//disconnect_promise.reset();
down(disconnect_event); // DISCONNECT is handled by each layer
//disconnect_promise.getResult(); // wait for DISCONNECT_OK
}
// Just in case we use the QUEUE protocol and it is still blocked...
down(new Event(Event.STOP_QUEUEING));
connected=false;
try {
prot_stack.stopStack(); // calls stop() in all protocols, from top to bottom
}
catch(Exception e) {
if(log.isErrorEnabled()) log.error("caught unexpected exception", e);
}
notifyChannelDisconnected(this);
init(); // sets local_addr=null; changed March 18 2003 (bela) -- prevented successful rejoining
}
}
/**
* Destroys the channel.
* After this method has been called, the channel us unusable.<BR>
* This operation will disconnect the channel and close the channel receive queue immediately<BR>
*/
@Override // GemStoneAddition
public synchronized void close() {
closing = true; // GemStoneAddition
_close(true, true); // by default disconnect before closing channel and close mq
// GemstoneAddition - wait for any closer thread to finish before returning
// so the caller can be assured that the channel is really closed
if (this.closer != null) {
try {
this.closer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/**
* GemStoneAddition - close w/o waiting for closer thread
*/
public synchronized void closeAsync() {
closing = true;
_close(true, true);
}
/** Shuts down the channel without disconnecting */
@Override // GemStoneAddition
public synchronized void shutdown() {
_close(false, true); // by default disconnect before closing channel and close mq
}
/**
* Opens the channel.
* This does the following actions:
* <ol>
* <li> Resets the receiver queue by calling Queue.reset
* <li> Sets up the protocol stack by calling ProtocolStack.setup
* <li> Sets the closed flag to false
* </ol>
*/
@Override // GemStoneAddition
public synchronized void open() throws ChannelException {
if(!closed)
throw new ChannelException("channel is already open");
try {
mq.reset();
// new stack is created on open() - bela June 12 2003
prot_stack=new ProtocolStack(this, props);
prot_stack.setup();
closed=false;
}
catch(Exception e) {
throw new ChannelException("failed to open channel" , e);
}
}
/**
* returns true if the Open operation has been called successfully
*/
@Override // GemStoneAddition
public boolean isOpen() {
return !closed;
}
/**
* returns true if the Connect operation has been called successfully
*/
@Override // GemStoneAddition
public boolean isConnected() {
return connected;
}
@Override // GemStoneAddition
public int getNumMessages() {
return mq != null? mq.size() : -1;
}
@Override // GemStoneAddition
public String dumpQueue() {
return Util.dumpQueue(mq);
}
/**
* Returns a map of statistics of the various protocols and of the channel itself.
* @return Map key=String, value=Map. A map where the keys are the protocols ("channel" pseudo key is
* used for the channel itself") and the values are property maps.
*/
@Override // GemStoneAddition
public Map dumpStats() {
Map retval=prot_stack.dumpStats();
if(retval != null) {
Map tmp=dumpChannelStats();
if(tmp != null)
retval.put("channel", tmp);
}
return retval;
}
private Map dumpChannelStats() {
Map retval=new HashMap();
retval.put("sent_msgs", Long.valueOf(sent_msgs));
retval.put("sent_bytes", Long.valueOf(sent_bytes));
retval.put("received_msgs", Long.valueOf(received_msgs));
retval.put("received_bytes", Long.valueOf(received_bytes));
return retval;
}
/**
* Sends a message through the protocol stack.
* Implements the Transport interface.
*
* @param msg the message to be sent through the protocol stack,
* the destination of the message is specified inside the message itself
* @exception ChannelNotConnectedException
* @exception ChannelClosedException
*/
@Override // GemStoneAddition
public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException {
checkClosed();
checkNotConnected();
if(stats) {
sent_msgs++;
sent_bytes+=msg.getLength();
}
down(new Event(Event.MSG, msg));
}
/**
* creates a new message with the destination address, and the source address
* and the object as the message value
* @param dst - the destination address of the message, null for all members
* @param src - the source address of the message
* @param obj - the value of the message
* @exception ChannelNotConnectedException
* @exception ChannelClosedException
* @see JChannel#send(Message)
*/
@Override // GemStoneAddition
public void send(Address dst, Address src, Serializable obj) throws ChannelNotConnectedException, ChannelClosedException {
send(new Message(dst, src, obj));
}
/**
* Blocking receive method.
* This method returns the object that was first received by this JChannel and that has not been
* received before. After the object is received, it is removed from the receive queue.<BR>
* If you only want to inspect the object received without removing it from the queue call
* JChannel.peek<BR>
* If no messages are in the receive queue, this method blocks until a message is added or the operation times out<BR>
* By specifying a timeout of 0, the operation blocks forever, or until a message has been received.
* @param timeout the number of milliseconds to wait if the receive queue is empty. 0 means wait forever
* @exception TimeoutException if a timeout occured prior to a new message was received
* @exception ChannelNotConnectedException
* @exception ChannelClosedException
* @see JChannel#peek
*/
@Override // GemStoneAddition
public Object receive(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException {
Object retval=null;
Event evt;
checkClosed();
checkNotConnected();
try {
evt=(timeout <= 0) ? (Event)mq.remove() : (Event)mq.remove(timeout);
retval=getEvent(evt);
evt=null;
if(stats) {
if(retval != null && retval instanceof Message) {
received_msgs++;
received_bytes+=((Message)retval).getLength();
}
}
return retval;
}
catch(QueueClosedException queue_closed) {
throw new ChannelClosedException();
}
catch(TimeoutException t) {
throw t;
}
catch(Exception e) {
if(log.isErrorEnabled()) log.error("caught unexpected exception", e);
return null;
}
}
/**
* Just peeks at the next message, view or block. Does <em>not</em> install
* new view if view is received<BR>
* Does the same thing as JChannel.receive but doesn't remove the object from the
* receiver queue
*/
@Override // GemStoneAddition
public Object peek(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException {
Object retval=null;
Event evt;
checkClosed();
checkNotConnected();
try {
evt=(timeout <= 0) ? (Event)mq.peek() : (Event)mq.peek(timeout);
retval=getEvent(evt);
evt=null;
return retval;
}
catch(QueueClosedException queue_closed) {
if(log.isErrorEnabled()) log.error("caught unexpected exception", queue_closed);
return null;
}
catch(TimeoutException t) {
return null;
}
catch(Exception e) {
if(log.isErrorEnabled()) log.error("caught unexpected exception", e);
return null;
}
}
/**
* Returns the current view.
* <BR>
* If the channel is not connected or if it is closed it will return null.
* <BR>
* @return returns the current group view, or null if the channel is closed or disconnected
*/
@Override // GemStoneAddition
public View getView() {
return closed || !connected ? null : my_view;
}
/**
* returns the local address of the channel
* returns null if the channel is closed
*/
@Override // GemStoneAddition
public Address getLocalAddress() {
return closed ? null : local_addr;
}
/**
* returns the name of the channel
* if the channel is not connected or if it is closed it will return null
*/
@Override // GemStoneAddition
public String getChannelName() {
return closed ? null : !connected ? null : channel_name;
}
/**
* Sets a channel option. The options can be one of the following:
* <UL>
* <LI> Channel.BLOCK
* <LI> Channel.VIEW
* <LI> Channel.SUSPECT
* <LI> Channel.LOCAL
* <LI> Channel.GET_STATE_EVENTS
* <LI> Channel.AUTO_RECONNECT
* <LI> Channel.AUTO_GETSTATE
* </UL>
* <P>
* There are certain dependencies between the options that you can set,
* I will try to describe them here.
* <P>
* Option: Channel.VIEW option<BR>
* Value: java.lang.Boolean<BR>
* Result: set to true the JChannel will receive VIEW change events<BR>
*<BR>
* Option: Channel.SUSPECT<BR>
* Value: java.lang.Boolean<BR>
* Result: set to true the JChannel will receive SUSPECT events<BR>
*<BR>
* Option: Channel.BLOCK<BR>
* Value: java.lang.Boolean<BR>
* Result: set to true will set setOpt(VIEW, true) and the JChannel will receive BLOCKS and VIEW events<BR>
*<BR>
* Option: GET_STATE_EVENTS<BR>
* Value: java.lang.Boolean<BR>
* Result: set to true the JChannel will receive state events<BR>
*<BR>
* Option: LOCAL<BR>
* Value: java.lang.Boolean<BR>
* Result: set to true the JChannel will receive messages that it self sent out.<BR>
*<BR>
* Option: AUTO_RECONNECT<BR>
* Value: java.lang.Boolean<BR>
* Result: set to true and the JChannel will try to reconnect when it is being closed<BR>
*<BR>
* Option: AUTO_GETSTATE<BR>
* Value: java.lang.Boolean<BR>
* Result: set to true, the AUTO_RECONNECT will be set to true and the JChannel will try to get the state after a close and reconnect happens<BR>
* <BR>
*
* @param option the parameter option Channel.VIEW, Channel.SUSPECT, etc
* @param value the value to set for this option
*
*/
@Override // GemStoneAddition
public void setOpt(int option, Object value) {
if(closed) {
if(log.isWarnEnabled()) log.warn("channel is closed; option not set !");
return;
}
switch(option) {
case VIEW:
if(value instanceof Boolean)
receive_views=((Boolean)value).booleanValue();
else
if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
" (" + value + "): value has to be Boolean");
break;
case SUSPECT:
if(value instanceof Boolean)
receive_suspects=((Boolean)value).booleanValue();
else
if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
" (" + value + "): value has to be Boolean");
break;
case BLOCK:
if(value instanceof Boolean)
receive_blocks=((Boolean)value).booleanValue();
else
if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
" (" + value + "): value has to be Boolean");
if(receive_blocks)
receive_views=true;
break;
case GET_STATE_EVENTS:
if(value instanceof Boolean)
receive_get_states=((Boolean)value).booleanValue();
else
if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
" (" + value + "): value has to be Boolean");
break;
case LOCAL:
if(value instanceof Boolean)
receive_local_msgs=((Boolean)value).booleanValue();
else
if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
" (" + value + "): value has to be Boolean");
break;
case AUTO_RECONNECT:
if(value instanceof Boolean)
auto_reconnect=((Boolean)value).booleanValue();
else
if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
" (" + value + "): value has to be Boolean");
break;
case AUTO_GETSTATE:
if(value instanceof Boolean) {
auto_getstate=((Boolean)value).booleanValue();
if(auto_getstate)
auto_reconnect=true;
}
else
if(log.isErrorEnabled()) log.error("option " + Channel.option2String(option) +
" (" + value + "): value has to be Boolean");
break;
default:
if(log.isErrorEnabled()) log.error(ExternalStrings.JChannel_OPTION__0__NOT_KNOWN, Channel.option2String(option));
break;
}
}
/**
* returns the value of an option.
* @param option the option you want to see the value for
* @return the object value, in most cases java.lang.Boolean
* @see JChannel#setOpt
*/
@Override // GemStoneAddition
public Object getOpt(int option) {
switch(option) {
case VIEW:
// return Boolean.valueOf(receive_views);
return receive_views ? Boolean.TRUE : Boolean.FALSE;
case BLOCK:
// return Boolean.valueOf(receive_blocks);
return receive_blocks ? Boolean.TRUE : Boolean.FALSE;
case SUSPECT:
// return Boolean.valueOf(receive_suspects);
return receive_suspects ? Boolean.TRUE : Boolean.FALSE;
case GET_STATE_EVENTS:
// return Boolean.valueOf(receive_get_states);
return receive_get_states ? Boolean.TRUE : Boolean.FALSE;
case LOCAL:
// return Boolean.valueOf(receive_local_msgs);
return receive_local_msgs ? Boolean.TRUE : Boolean.FALSE;
default:
if(log.isErrorEnabled()) log.error(ExternalStrings.JChannel_OPTION__0__NOT_KNOWN, Channel.option2String(option));
return null;
}
}
/**
* Called to acknowledge a block() (callback in <code>MembershipListener</code> or
* <code>BlockEvent</code> received from call to <code>receive()</code>).
* After sending blockOk(), no messages should be sent until a new view has been received.
* Calling this method on a closed channel has no effect.
*/
@Override // GemStoneAddition
public void blockOk() {
down(new Event(Event.BLOCK_OK));
down(new Event(Event.START_QUEUEING));
}
/**
* Retrieves the current group state. Sends GET_STATE event down to STATE_TRANSFER layer.
* Blocks until STATE_TRANSFER sends up a GET_STATE_OK event or until <code>timeout</code>
* milliseconds have elapsed. The argument of GET_STATE_OK should be a single object.
* @param target - the target member to receive the state from. if null, state is retrieved from coordinator
* @param timeout - the number of milliseconds to wait for the operation to complete successfully
* @return true of the state was received, false if the operation timed out
*/
@Override // GemStoneAddition
public boolean getState(Address target, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
StateTransferInfo info=new StateTransferInfo(StateTransferInfo.GET_FROM_SINGLE, target);
info.timeout=timeout;
return _getState(new Event(Event.GET_STATE, info), timeout);
}
/**
* Retrieves the current group state. Sends GET_STATE event down to STATE_TRANSFER layer.
* Blocks until STATE_TRANSFER sends up a GET_STATE_OK event or until <code>timeout</code>
* milliseconds have elapsed. The argument of GET_STATE_OK should be a vector of objects.
* @param targets - the target members to receive the state from ( an Address list )
* @param timeout - the number of milliseconds to wait for the operation to complete successfully
* @return true of the state was received, false if the operation timed out
*/
@Override // GemStoneAddition
public boolean getAllStates(Vector targets, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
StateTransferInfo info=new StateTransferInfo(StateTransferInfo.GET_FROM_MANY, targets);
return _getState(new Event(Event.GET_STATE, info), timeout);
}
/**
* Called by the application is response to receiving a <code>getState()</code> object when
* calling <code>receive()</code>.
* When the application receives a getState() message on the receive() method,
* it should call returnState() to reply with the state of the application
* @param state The state of the application as a byte buffer
* (to send over the network).
*/
@Override // GemStoneAddition
public void returnState(byte[] state) {
down(new Event(Event.GET_APPLSTATE_OK, state));
}
/**
* Callback method <BR>
* Called by the ProtocolStack when a message is received.
* It will be added to the message queue from which subsequent
* <code>Receive</code>s will dequeue it.
* @param evt the event carrying the message from the protocol stack
*/
public void up(Event evt) {
int type=evt.getType();
Message msg;
switch(type) {
case Event.MSG:
msg=(Message)evt.getArg();
if(!receive_local_msgs) { // discard local messages (sent by myself to me)
if(local_addr != null && msg.getSrc() != null)
if(local_addr.equals(msg.getSrc()))
return;
}
break;
case Event.VIEW_CHANGE:
View tmp=(View)evt.getArg();
if(tmp instanceof MergeView)
my_view=new View(tmp.getVid(), tmp.getMembers());
else
my_view=tmp;
// crude solution to bug #775120: if we get our first view *before* the CONNECT_OK,
// we simply set the state to connected
if(connected == false) {
connected=true;
connect_promise.setResult(Boolean.TRUE);
}
// unblock queueing of messages due to previous BLOCK event:
down(new Event(Event.STOP_QUEUEING));
if(!receive_views) // discard if client has not set receving views to on
return;
//if(connected == false)
// my_view=(View)evt.getArg();
break;
case Event.SUSPECT:
if(!receive_suspects)
return;
break;
case Event.GET_APPLSTATE: // return the application's state
if(!receive_get_states) { // if not set to handle state transfers, send null state
down(new Event(Event.GET_APPLSTATE_OK, null));
return;
}
break;
case Event.CONFIG:
HashMap config=(HashMap)evt.getArg();
if(config != null && config.containsKey("state_transfer"))
state_transfer_supported=((Boolean)config.get("state_transfer")).booleanValue();
break;
case Event.BLOCK:
// If BLOCK is received by application, then we trust the application to not send
// any more messages until a VIEW_CHANGE is received. Otherwise (BLOCKs are disabled),
// we queue any messages sent until the next VIEW_CHANGE (they will be sent in the
// next view)
if(!receive_blocks) { // discard if client has not set 'receiving blocks' to 'on'
down(new Event(Event.BLOCK_OK));
down(new Event(Event.START_QUEUEING));
return;
}
break;
case Event.CONNECT_OK:
connect_promise.setResult(Boolean.TRUE);
break;
case Event.DISCONNECT_OK:
// disconnect_promise.setResult(Boolean.TRUE);
break;
case Event.GET_STATE_OK:
Object state=evt.getArg();
state_promise.setResult(state);
if(up_handler != null) {
up_handler.up(evt);
return;
}
if(state != null) {
if(receiver != null) {
receiver.setState((byte[])state);
}
else {
try {mq.add(new Event(Event.STATE_RECEIVED, state));} catch(Exception e) {}
}
}
break;
case Event.SET_LOCAL_ADDRESS:
local_addr_promise.setResult(evt.getArg());
break;
case Event.EXIT:
this.exitEvent = evt; // GemStoneAddition
handleExit(evt);
return; // no need to pass event up; already done in handleExit()
case Event.BLOCK_SEND: // emitted by FLOW_CONTROL
if(log.isInfoEnabled()) log.info(ExternalStrings.JChannel_RECEIVED_BLOCK_SEND);
block_sending.set(Boolean.TRUE);
break;
case Event.UNBLOCK_SEND: // emitted by FLOW_CONTROL
if(log.isInfoEnabled()) log.info(ExternalStrings.JChannel_RECEIVED_UNBLOCK_SEND);
block_sending.set(Boolean.FALSE);
break;
default:
break;
}
// If UpHandler is installed, pass all events to it and return (UpHandler is e.g. a building block)
if(up_handler != null) {
up_handler.up(evt);
return;
}
switch(type) {
case Event.MSG:
if(receiver != null) {
receiver.receive((Message)evt.getArg());
return;
}
break;
case Event.VIEW_CHANGE:
if(receiver != null) {
receiver.viewAccepted((View)evt.getArg());
return;
}
break;
case Event.SUSPECT:
if(receiver != null) {
receiver.suspect((SuspectMember)evt.getArg()); // GemStoneAddition SuspectMember struct
return;
}
break;
case Event.GET_APPLSTATE:
if(receiver != null) {
byte[] tmp_state=receiver.getState();
returnState(tmp_state);
return;
}
break;
case Event.BLOCK:
if(receiver != null) {
receiver.block();
return;
}
break;
default:
break;
}
if(receiver == null && // GemStoneAddition
(type == Event.MSG || type == Event.VIEW_CHANGE || type == Event.SUSPECT ||
type == Event.GET_APPLSTATE || type == Event.BLOCK)) {
try {
mq.add(evt);
}
catch(Exception e) {
if(log.isErrorEnabled()) log.error("caught unexpected exception", e);
}
}
}
/**
* Sends a message through the protocol stack if the stack is available
* @param evt the message to send down, encapsulated in an event
*/
@Override // GemStoneAddition
public void down(Event evt) {
if(evt == null) return;
if(suspended) {
synchronized(suspend_mutex) {
while(suspended) {
// GemStoneAddition -- be more aggressive about preserving
// the interrupt bit
boolean interrupted = Thread.interrupted();
try {
suspend_mutex.wait();
}
catch(InterruptedException e) {
interrupted = true; // GemStoneAddition
}
finally { // GemStoneAddition
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
}
int type=evt.getType();
// only block for messages; all other events are passed through
// we use double-checked locking; it is okay to 'lose' one or more messages because block_sending changes
// to true after an initial false value
if(type == Event.MSG && block_sending.get().equals(Boolean.TRUE)) {
if(log.isTraceEnabled()) log.trace("down() blocks because block_sending == true");
block_sending.waitUntil(Boolean.FALSE);
}
// handle setting of additional data (kludge, will be removed soon)
if(type == Event.CONFIG) {
try {
Map m=(Map)evt.getArg();
if(m != null && m.containsKey("additional_data")) {
additional_data=(byte[])m.get("additional_data");
}
} catch (RuntimeException t) {
if(log.isErrorEnabled()) log.error(ExternalStrings.JChannel_CONFIG_EVENT_DID_NOT_CONTAIN_A_HASHMAP__0, t);
}
}
if(prot_stack != null)
prot_stack.down(evt);
else
if(log.isErrorEnabled()) log.error(ExternalStrings.JChannel_NO_PROTOCOL_STACK_AVAILABLE);
}
/** Send() blocks from now on, until resume() is called */
public void suspend() {
synchronized(suspend_mutex) {
suspended=true;
}
}
/** Send() unblocks */
public void resume() {
synchronized(suspend_mutex) {
suspended=false;
suspend_mutex.notifyAll();
}
}
public boolean isSuspended() {
return suspended;
}
public String toString(boolean details) {
StringBuffer sb=new StringBuffer();
sb.append("local_addr=").append(local_addr).append('\n');
sb.append("channel_name=").append(channel_name).append('\n');
sb.append("my_view=").append(my_view).append('\n');
sb.append("connected=").append(connected).append('\n');
sb.append("closed=").append(closed).append('\n');
if(mq != null)
sb.append("incoming queue size=").append(mq.size()).append('\n');
if(details) {
sb.append("block_sending=").append(block_sending).append('\n');
sb.append("receive_views=").append(receive_views).append('\n');
sb.append("receive_suspects=").append(receive_suspects).append('\n');
sb.append("receive_blocks=").append(receive_blocks).append('\n');
sb.append("receive_local_msgs=").append(receive_local_msgs).append('\n');
sb.append("receive_get_states=").append(receive_get_states).append('\n');
sb.append("auto_reconnect=").append(auto_reconnect).append('\n');
sb.append("auto_getstate=").append(auto_getstate).append('\n');
sb.append("state_transfer_supported=").append(state_transfer_supported).append('\n');
sb.append("props=").append(props).append('\n');
}
return sb.toString();
}
/* ----------------------------------- Private Methods ------------------------------------- */
/**
* Initializes all variables. Used after <tt>close()</tt> or <tt>disconnect()</tt>,
* to be ready for new <tt>connect()</tt>
*/
private void init() {
local_addr=null;
channel_name=null;
my_view=null;
// changed by Bela Sept 25 2003
//if(mq != null && mq.closed())
// mq.reset();
connect_promise.reset();
// disconnect_promise.reset();
connected=false;
block_sending.set(Boolean.FALSE);
}
/**
* health check.<BR>
* throws a ChannelNotConnected exception if the channel is not connected
*/
private final void checkNotConnected() throws ChannelNotConnectedException {
if(!connected)
throw new ChannelNotConnectedException();
}
/**
* health check<BR>
* throws a ChannelClosed exception if the channel is closed
*/
private final void checkClosed() throws ChannelClosedException {
if(closed) {
// GemStoneChange - if there's a closeException, set it as the
// cause of the CCE
ChannelClosedException ex = new ChannelClosedException();
if (closeException != null) {
ex.initCause(closeException);
}
throw ex;
}
}
/**
* returns the value of the event<BR>
* These objects will be returned<BR>
* <PRE>
* <B>Event Type - Return Type</B>
* Event.MSG - returns a Message object
* Event.VIEW_CHANGE - returns a View object
* Event.SUSPECT - returns a SuspectEvent object
* Event.BLOCK - returns a new BlockEvent object
* Event.GET_APPLSTATE - returns a GetStateEvent object
* Event.STATE_RECEIVED- returns a SetStateEvent object
* Event.Exit - returns an ExitEvent object
* All other - return the actual Event object
* </PRE>
* @param evt - the event of which you want to extract the value
* @return the event value if it matches the select list,
* returns null if the event is null
* returns the event itself if a match (See above) can not be made of the event type
*/
static Object getEvent(Event evt) {
if(evt == null)
return null; // correct ?
switch(evt.getType()) {
case Event.MSG:
return evt.getArg();
case Event.VIEW_CHANGE:
return evt.getArg();
case Event.SUSPECT:
return new SuspectEvent((SuspectMember)evt.getArg());
case Event.BLOCK:
return new BlockEvent();
case Event.GET_APPLSTATE:
return new GetStateEvent(evt.getArg());
case Event.STATE_RECEIVED:
return new SetStateEvent((byte[])evt.getArg());
case Event.EXIT:
return new ExitEvent();
default:
return evt;
}
}
/**
* Receives the state from the group and modifies the JChannel.state object<br>
* This method initializes the local state variable to null, and then sends the state
* event down the stack. It waits for a GET_STATE_OK event to bounce back
* @param evt the get state event, has to be of type Event.GET_STATE
* @param timeout the number of milliseconds to wait for the GET_STATE_OK response
* @return true of the state was received, false if the operation timed out
*/
boolean _getState(Event evt, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
checkClosed();
checkNotConnected();
if(!state_transfer_supported) {
log.error("fetching state will fail as state transfer is not supported. "
+ "Add one of the STATE_TRANSFER protocols to your protocol configuration");
return false;
}
state_promise.reset();
down(evt);
byte[] state=(byte[])state_promise.getResult(timeout);
if(state != null) // state set by GET_STATE_OK event
return true;
else
return false;
}
/**
* Disconnects and closes the channel.
* This method does the folloing things
* <ol>
* <li>Calls <code>this.disconnect</code> if the disconnect parameter is true
* <li>Calls <code>Queue.close</code> on mq if the close_mq parameter is true
* <li>Calls <code>ProtocolStack.stop</code> on the protocol stack
* <li>Calls <code>ProtocolStack.destroy</code> on the protocol stack
* <li>Sets the channel closed and channel connected flags to true and false
* <li>Notifies any channel listener of the channel close operation
* </ol>
*/
protected/*GemStoneAddition*/ void _close(boolean disconnect, boolean close_mq) {
if(closed)
return;
if(!disconnect)
resume();
if(disconnect)
disconnect(); // leave group if connected
if(close_mq) {
try {
if(mq != null)
mq.close(false); // closes and removes all messages
}
catch(Exception e) {
if(log.isErrorEnabled()) log.error("caught unexpected exception", e);
}
}
if(prot_stack != null) {
try {
prot_stack.stopStack();
prot_stack.destroy();
}
catch(Exception e) {
if(log.isErrorEnabled()) log.error("caught unexpected exception", e);
}
}
closed=true;
closing = false; // GemStoneAddition
connected=false;
notifyChannelClosed(this);
init(); // sets local_addr=null; changed March 18 2003 (bela) -- prevented successful rejoining
}
/** returns the exception that caused the stack to close, if there is one */
public Exception getCloseException() {
return this.closeException;
}
/**
* Creates a separate thread to close the protocol stack.
* This is needed because the thread that called JChannel.up() with the EXIT event would
* hang waiting for up() to return, while up() actually tries to kill that very thread.
* This way, we return immediately and allow the thread to terminate.
*/
private void handleExit(Event evt) {
closing = true; // GemStoneAddition
if (evt.getArg() instanceof RuntimeException) { // GemStoneAddition
closeException = (RuntimeException)evt.getArg();
}
// else {
// log.getLogWriter().warning("DEBUGGING: handleExit with no exception", new Exception("Stack trace"));
// }
notifyChannelShunned();
synchronized (this) { // GemStoneAddition
if(closer != null && !closer.isAlive())
closer=null;
if(closer == null) {
if(log.isInfoEnabled())
log.info(ExternalStrings.JChannel_RECEIVED_AN_EXIT_EVENT_WILL_LEAVE_THE_CHANNEL);
closer=new CloserThread(evt);
closer.start();
}
} // synchronized
}
/** GemStoneAddition is this channel closing or closed? */
@Override // GemStoneAddition
public boolean closing() {
return closing || closed;
}
/** GemStoneAddition for testing - wait for closer thread to do its job */
public void waitForClose() throws InterruptedException {
Thread c;
synchronized (this) { // GemStoneAddition
c = this.closer;
}
if (c != null && c.isAlive()) {
c.join();
}
}
/* ------------------------------- End of Private Methods ---------------------------------- */
class CloserThread extends Thread {
final Event evt;
final Thread t=null;
final AtomicBoolean done = new AtomicBoolean(false);
CloserThread(Event evt) {
this.evt=evt;
setName("CloserThread");
// GemStoneAddition: closer thread must keep cache servers alive
// until a new Acceptor
setDaemon(false);
}
@Override // GemStoneAddition
public void run() {
try {
String old_channel_name=channel_name; // remember because close() will null it
if(log.isInfoEnabled())
log.info(ExternalStrings.JChannel_CLOSING_THE_CHANNEL);
JChannel.this.prot_stack.gfPeerFunctions.beforeChannelClosing("before channel closing", JChannel.this.closeException);
_close(false, false); // do not disconnect before closing channel, do not close mq (yet !)
if(up_handler != null)
up_handler.up(this.evt);
else {
try {
if(receiver == null)
mq.add(this.evt);
else
receiver.channelClosing(JChannel.this, JChannel.this.closeException);
}
catch(Exception ex) {
if(log.isErrorEnabled()) log.error("caught unexpected exception", ex);
}
}
if(mq != null) {
Util.sleep(500); // give the mq thread a bit of time to deliver EXIT to the application
try {
mq.close(false);
}
catch(Exception ex) {
}
}
if (!connected) {
connect_promise.setResult(Boolean.FALSE); // GemStoneAddition - let jchannel.connect exit
}
if(auto_reconnect) {
try {
if(log.isInfoEnabled()) log.info(ExternalStrings.JChannel_RECONNECTING_TO_GROUP__0, old_channel_name);
open();
}
catch(Exception ex) {
if(log.isErrorEnabled()) log.error(ExternalStrings.JChannel_FAILURE_REOPENING_CHANNEL__0, ex);
return;
}
try {
if(additional_data != null) {
// set previously set additional data
Map m=new HashMap(11);
m.put("additional_data", additional_data);
down(new Event(Event.CONFIG, m));
}
connect(old_channel_name);
notifyChannelReconnected(local_addr);
}
catch(Exception ex) {
if(log.isErrorEnabled()) log.error(ExternalStrings.JChannel_FAILURE_RECONNECTING_TO_CHANNEL__0, ex);
return;
}
}
if(auto_getstate) {
if(log.isInfoEnabled())
log.info(ExternalStrings.JChannel_FETCHING_THE_STATE_AUTO_GETSTATETRUE);
boolean rc=JChannel.this.getState(null, GET_STATE_DEFAULT_TIMEOUT);
if(rc)
if(log.isInfoEnabled()) log.info(ExternalStrings.JChannel_STATE_WAS_RETRIEVED_SUCCESSFULLY);
else
if(log.isInfoEnabled()) log.info(ExternalStrings.JChannel_STATE_TRANSFER_FAILED);
}
}
catch(Exception ex) {
if(log.isErrorEnabled()) log.error("JGroups closer thread caught exception", ex);
}
finally {
synchronized (CloserThread.this) { // GemStoneAddition
done.set(true);
closer=null;
}
}
}
}
/**
* GemStoneAddition
* returns the current seqno for the NAKACK layer, or zero if there is no NAKACK layer
*/
public long getMulticastState()
{
NAKACK nakack = (NAKACK)prot_stack.findProtocol("NAKACK");
if (nakack == null) {
return 0;
}
else {
return nakack.getCurrentSeqno();
}
}
/**
* GemStoneAddition
* waits for the NAKACK multicast state for the given member to reach
* the given seqno
*/
public void waitForMulticastState(Address otherMember, long seqno)
throws InterruptedException
{
if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition
NAKACK nakack = (NAKACK)prot_stack.findProtocol("NAKACK");
if (nakack != null) {
long highest;
while ((highest=nakack.getHighSeqnoDispatched(otherMember)) < seqno) {
if (log.getLogWriter().fineEnabled()) {
log.getLogWriter().fine("Waiting for multicast seqno for " + otherMember + ", current=" + highest + ", need=" + seqno);
}
Thread.sleep(100);
}
}
}
/*
* Code ONLY for help in testing.
*/
public void setClosed(boolean isClosed) {
this.closed = isClosed;
}
}