blob: 091db4a2d737ddb2975819ca6821a03074d289e7 [file] [log] [blame]
/** Notice of modification as required by the LGPL
* This file was modified by Gemstone Systems Inc. on
* $Date$
**/
package com.gemstone.org.jgroups.protocols;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;
import java.util.Vector;
import com.gemstone.org.jgroups.Address;
import com.gemstone.org.jgroups.Channel;
import com.gemstone.org.jgroups.Event;
import com.gemstone.org.jgroups.JChannel;
import com.gemstone.org.jgroups.JGroupsVersion;
import com.gemstone.org.jgroups.Message;
import com.gemstone.org.jgroups.View;
import com.gemstone.org.jgroups.oswego.concurrent.BoundedLinkedQueue;
import com.gemstone.org.jgroups.protocols.pbcast.NAKACK.RetransmissionTooLargeException;
import com.gemstone.org.jgroups.stack.IpAddress;
import com.gemstone.org.jgroups.stack.Protocol;
import com.gemstone.org.jgroups.util.Buffer;
import com.gemstone.org.jgroups.util.ExposedBufferedInputStream;
import com.gemstone.org.jgroups.util.ExposedBufferedOutputStream;
import com.gemstone.org.jgroups.util.ExposedByteArrayInputStream;
import com.gemstone.org.jgroups.util.ExposedByteArrayOutputStream;
import com.gemstone.org.jgroups.util.ExternalStrings;
import com.gemstone.org.jgroups.util.GemFireTracer;
import com.gemstone.org.jgroups.util.List;
import com.gemstone.org.jgroups.util.QueueClosedException;
import com.gemstone.org.jgroups.util.TimeScheduler;
import com.gemstone.org.jgroups.util.Util;
//import com.gemstone.org.jgroups.util.Queue; // 1.5
/**
* Generic transport - specific implementations should extend this abstract class.
* Features which are provided to the subclasses include
* <ul>
* <li>version checking
* <li>marshalling and unmarshalling
* <li>message bundling (handling single messages, and message lists)
* <li>incoming packet handler
* <li>loopback
* </ul>
* A subclass has to override
* <ul>
* <li>{@link #sendToAllMembers(byte[], int, int)}
* <li>{@link #sendToSingleMember(com.gemstone.org.jgroups.Address, boolean, byte[], int, int)}
* <li>{@link #init()}
* <li>{@link #start()}: subclasses <em>must</em> call super.start() <em>after</em> they initialize themselves
* (e.g., created their sockets).
* <li>{@link #stop()}: subclasses <em>must</em> call super.stop() after they deinitialized themselves
* <li>{@link #destroy()}
* </ul>
* The create() or start() method has to create a local address.<br>
* The receive(Address, Address, byte[], int, int) method must
* be called by subclasses when a unicast or multicast message has been received.
* @author Bela Ban
* @version $Id: TP.java,v 1.53 2005/12/23 17:08:22 belaban Exp $
*/
public abstract class TP extends Protocol {
public static final/*GemStoneAddition*/ boolean VERBOSE = Boolean.getBoolean("TP.VERBOSE")
|| Boolean.getBoolean("DistributionManager.DEBUG_JAVAGROUPS");
/** The address (host and port) of this member */
Address local_addr=null;
/** The name of the group to which this member is connected */
String channel_name=null;
/** The interface (NIC) which should be used by this transport */
InetAddress bind_addr=null;
/** Overrides bind_addr and -Dbind.address: let's the OS return the local host address */
// boolean use_local_host=false; GemStoneAddition
/** If true, the transport should use all available interfaces to receive multicast messages
* @deprecated Use {@link #receive_on_all_interfaces} instead */
// boolean bind_to_all_interfaces=false; GemStoneAddition
/** If true, the transport should use all available interfaces to receive multicast messages */
boolean receive_on_all_interfaces=false;
/** List of NetworkInterface of interfaces to receive multicasts on. The multicast receive socket will listen
* on all of these interfaces. This is a comma-separated list of IP addresses or interface names. E.g.
* "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded; we only bind to an interface once.
* If this property is set, it override receive_on_all_interfaces.
*/
java.util.List receive_interfaces=null;
/** If true, the transport should use all available interfaces to send multicast messages. This means
* the same multicast message is sent N times, so use with care */
boolean send_on_all_interfaces=false;
/** List of NetworkInterface of interfaces to send multicasts on. The multicast send socket will send the
* same multicast message on all of these interfaces. This is a comma-separated list of IP addresses or
* interface names. E.g. "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded.
* If this property is set, it override send_on_all_interfaces.
*/
java.util.List send_interfaces=null;
/** The port to which the transport binds. 0 means to bind to any (ephemeral) port */
int bind_port=0;
int port_range=1; // 27-6-2003 bgooren, Only try one port by default
/** The members of this group (updated when a member joins or leaves) */
final Vector members=new Vector(11);
View view=null;
/** Pre-allocated byte stream. Used for marshalling messages. Will grow as needed */
// final ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(1024);
// final ExposedBufferedOutputStream buf_out_stream=new ExposedBufferedOutputStream(out_stream, 1024);
// final ExposedDataOutputStream dos=new ExposedDataOutputStream(buf_out_stream);
final ExposedByteArrayInputStream in_stream=new ExposedByteArrayInputStream(new byte[]{'0'});
final ExposedBufferedInputStream buf_in_stream=new ExposedBufferedInputStream(in_stream);
final DataInputStream dis=new DataInputStream(buf_in_stream);
/** If true, messages sent to self are treated specially: unicast messages are
* looped back immediately, multicast messages get a local copy first and -
* when the real copy arrives - it will be discarded. Useful for Window
* media (non)sense */
boolean loopback=false;
/** Discard packets with a different version. Usually minor version differences are okay. Setting this property
* to true means that we expect the exact same version on all incoming packets */
boolean discard_incompatible_packets=false;
/** Sometimes receivers are overloaded (they have to handle de-serialization etc).
* Packet handler is a separate thread taking care of de-serialization, receiver
* thread(s) simply put packet in queue and return immediately. Setting this to
* true adds one more thread */
boolean use_incoming_packet_handler=false;
/** Used by packet handler to store incoming DatagramPackets */
com.gemstone.org.jgroups.util.Queue incoming_packet_queue=null;
/** Dequeues DatagramPackets from packet_queue, unmarshalls them and
* calls <tt>handleIncomingUdpPacket()</tt> */
IncomingPacketHandler incoming_packet_handler=null;
/** Used by packet handler to store incoming Messages */
com.gemstone.org.jgroups.util.Queue incoming_msg_queue=null;
IncomingMessageHandler incoming_msg_handler;
/** Packets to be sent are stored in outgoing_queue and sent by a separate thread. Enabling this
* value uses an additional thread */
boolean use_outgoing_packet_handler=false;
/** Used by packet handler to store outgoing DatagramPackets */
BoundedLinkedQueue outgoing_queue=null;
/** max number of elements in the bounded outgoing_queue */
int outgoing_queue_max_size=2000;
OutgoingPacketHandler outgoing_packet_handler=null;
/** If set it will be added to <tt>local_addr</tt>. Used to implement
* for example transport independent addresses */
byte[] additional_data=null;
/** Maximum number of bytes for messages to be queued until they are sent. This value needs to be smaller
than the largest datagram packet size in case of UDP */
int max_bundle_size=AUTOCONF.senseMaxFragSizeStatic();
/** Max number of milliseconds until queued messages are sent. Messages are sent when max_bundle_size or
* max_bundle_timeout has been exceeded (whichever occurs faster)
*/
long max_bundle_timeout=20;
/** Enabled bundling of smaller messages into bigger ones */
boolean enable_bundling=false;
Bundler bundler=null;
TimeScheduler timer=null;
DiagnosticsHandler diag_handler=null;
boolean enable_diagnostics=true;
String diagnostics_addr="224.0.0.75";
int diagnostics_port=7500;
/** HashMap key=Address, value=Address. Keys=senders, values=destinations. For each incoming message M with sender S, adds
* an entry with key=S and value= sender's IP address and port.
*/
// HashMap addr_translation_table=new HashMap(); GemStoneAddition
// boolean use_addr_translation=false; GemStoneAddition
TpHeader header;
final String name=getName();
static final byte LIST = 1; // we have a list of messages rather than a single message when set
static final byte MULTICAST = 2; // message is a multicast (versus a unicast) message when set
long num_msgs_sent=0, num_msgs_received=0, num_bytes_sent=0, num_bytes_received=0;
transient static NumberFormat f;
// private boolean connected; // GemStoneAddition - set to true once connected
/** GemStoneAddition - ping/pong is used to see if GemFire is listening on a port */
private boolean pongReceived;
private final Object pongSync = new Object();
/** GemStoneAddition - version to use when multicasting. This defaults
* to 1.0.0 to allow multicast discovery to work during a rolling upgrade.
*/
private short multicastVersion = Message.multicastVersion;
static {
f=NumberFormat.getNumberInstance();
f.setGroupingUsed(false);
f.setMaximumFractionDigits(2);
}
/**
* Just ensure that this class gets loaded.
*
* @see SystemFailure#loadEmergencyClasses()
*/
public static void loadEmergencyClasses() {
// just using java.net, java.lang; we're safe.
}
/**
* Closes the diagnostic handler, if it is open
*
* @see SystemFailure#emergencyClose()
*/
public void emergencyClose() {
DiagnosticsHandler ds = diag_handler;
if (ds != null) {
MulticastSocket ms = ds.diag_sock;
if (ms != null) {
ms.close();
}
Thread thr = ds.t;
if (thr != null) {
thr.interrupt();
}
}
}
/**
* Creates the TP protocol, and initializes the
* state variables, does however not start any sockets or threads.
*/
protected TP() {
}
/**
* debug only
*/
@Override // GemStoneAddition
public String toString() {
return name + "(local address: " + local_addr + ')';
}
@Override // GemStoneAddition
public void resetStats() {
num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=0;
}
public long getNumMessagesSent() {return num_msgs_sent;}
public long getNumMessagesReceived() {return num_msgs_received;}
public long getNumBytesSent() {return num_bytes_sent;}
public long getNumBytesReceived() {return num_bytes_received;}
public String getBindAddress() {return bind_addr != null? bind_addr.toString() : "null";}
public InetAddress getInetBindAddress() { return bind_addr; } // GemStoneAddition
public void setBindAddress(String bind_addr) throws UnknownHostException {
this.bind_addr=InetAddress.getByName(bind_addr);
}
/** @deprecated Use {@link #isReceiveOnAllInterfaces()} instead */
@Deprecated // GemStoneAddition
public boolean getBindToAllInterfaces() {return receive_on_all_interfaces;}
public void setBindToAllInterfaces(boolean flag) {this.receive_on_all_interfaces=flag;}
public boolean isReceiveOnAllInterfaces() {return receive_on_all_interfaces;}
public java.util.List getReceiveInterfaces() {return receive_interfaces;}
public boolean isSendOnAllInterfaces() {return send_on_all_interfaces;}
public java.util.List getSendInterfaces() {return send_interfaces;}
public boolean isDiscardIncompatiblePackets() {return discard_incompatible_packets;}
public void setDiscardIncompatiblePackets(boolean flag) {discard_incompatible_packets=flag;}
public boolean isEnableBundling() {return enable_bundling;}
public void setEnableBundling(boolean flag) {enable_bundling=flag;}
public int getMaxBundleSize() {return max_bundle_size;}
public void setMaxBundleSize(int size) {max_bundle_size=size;}
public long getMaxBundleTimeout() {return max_bundle_timeout;}
public void setMaxBundleTimeout(long timeout) {max_bundle_timeout=timeout;}
public int getOutgoingQueueSize() {return outgoing_queue != null? outgoing_queue.size() : 0;}
public int getIncomingQueueSize() {return incoming_packet_queue != null? incoming_packet_queue.size() : 0;}
public Address getLocalAddress() {return local_addr;}
public String getChannelName() {return channel_name;}
public boolean isLoopback() {return loopback;}
public void setLoopback(boolean b) {loopback=b;}
public boolean isUseIncomingPacketHandler() {return use_incoming_packet_handler;}
public boolean isUseOutgoingPacketHandler() {return use_outgoing_packet_handler;}
public int getOutgoingQueueMaxSize() {return outgoing_queue != null? outgoing_queue_max_size : 0;}
public void setOutgoingQueueMaxSize(int new_size) {
if(outgoing_queue != null) {
outgoing_queue.setCapacity(new_size);
outgoing_queue_max_size=new_size;
}
}
@Override // GemStoneAddition
public Map dumpStats() {
Map retval=super.dumpStats();
if(retval == null)
retval=new HashMap();
retval.put("num_msgs_sent", Long.valueOf(num_msgs_sent));
retval.put("num_msgs_received", Long.valueOf(num_msgs_received));
retval.put("num_bytes_sent", Long.valueOf(num_bytes_sent));
retval.put("num_bytes_received", Long.valueOf(num_bytes_received));
return retval;
}
/**
* Send to all members in the group. UDP would use an IP multicast message, whereas TCP would send N
* messages, one for each member
* @param data The data to be sent. This is not a copy, so don't modify it
* @param offset
* @param length
* @throws Exception
*/
public abstract void sendToAllMembers(byte[] data, int offset, int length) throws Exception;
/**
* Send to all members in the group. UDP would use an IP multicast message, whereas TCP would send N
* messages, one for each member
* @param dest Must be a non-null unicast address
* @param data The data to be sent. This is not a copy, so don't modify it
* @param offset
* @param length
* @throws Exception
*/
public abstract void sendToSingleMember(Address dest, boolean isJoinResponse, byte[] data, int offset, int length) throws Exception;
public abstract String getInfo();
public abstract void postUnmarshalling(Message msg, Address dest, Address src, boolean multicast);
public abstract void postUnmarshallingList(Message msg, Address dest, boolean multicast);
private String _getInfo() {
StringBuffer sb=new StringBuffer();
sb.append(local_addr).append(" (").append(channel_name).append(") ").append("\n");
sb.append("local_addr=").append(local_addr).append("\n");
sb.append("group_name=").append(channel_name).append("\n");
sb.append("Version=").append(JGroupsVersion.description).append(", cvs=\"").append(JGroupsVersion.cvs).append("\"\n");
sb.append("view: ").append(view).append('\n');
sb.append(getInfo());
return sb.toString();
}
protected/*GemStoneAddition*/ void handleDiagnosticProbe(SocketAddress sender, DatagramSocket sock, String request) {
try {
StringTokenizer tok=new StringTokenizer(request);
String req=tok.nextToken();
String info="n/a";
if(req.trim().toLowerCase().startsWith("query")) {
ArrayList l=new ArrayList(tok.countTokens());
while(tok.hasMoreTokens())
l.add(tok.nextToken().trim().toLowerCase());
info=_getInfo();
if(l.contains("jmx")) {
if(info == null) info="";
Channel ch=stack.getChannel();
if(ch != null) {
Map m=ch.dumpStats();
StringBuffer sb=new StringBuffer();
sb.append("stats:\n");
for(Iterator it=m.entrySet().iterator(); it.hasNext();) {
sb.append(it.next()).append("\n");
}
info+=sb.toString();
}
}
if(l.contains("props")) {
String p=stack.printProtocolSpecAsXML();
info+="\nprops:\n" + p;
}
}
byte[] diag_rsp=info.getBytes();
if(log.isDebugEnabled())
log.debug("sending diag response to " + sender);
sendResponse(sock, sender, diag_rsp);
}
catch (VirtualMachineError err) { // GemStoneAddition
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch(Throwable t) {
if(log.isErrorEnabled())
log.error(ExternalStrings.TP_FAILED_SENDING_DIAG_RSP_TO__0, sender, t);
}
}
/** GemStoneAddition - AvailablePort support. Send a response to a ping from
another process that's groping for an available multicast port */
private void sendPingResponse(Address asender) {
byte msgbuf[] = new byte[4];
msgbuf[0] = (byte)'p';
msgbuf[1] = (byte)'o';
msgbuf[2] = (byte)'n';
msgbuf[3] = (byte)'g';
IpAddress sender = (IpAddress)asender;
sender.setName(""); // The address will hold this VM's default attributes, so get rid of the name field before logging it
if (log.isDebugEnabled()) {
log.debug("Responding to ping-pong request from " + sender);
}
try {
doSend(new Buffer(msgbuf, 0, msgbuf.length), false, sender, false);
}
catch (Exception e) {
if (log.getLogWriter().fineEnabled()) {
log.getLogWriter().fine("exception sending ping response to " + sender, e);
}
}
}
private void sendResponse(DatagramSocket sock, SocketAddress sender, byte[] buf) throws IOException {
DatagramPacket p=new DatagramPacket(buf, 0, buf.length, sender);
sock.send(p);
}
/* ------------------------------------------------------------------------------- */
/*------------------------------ Protocol interface ------------------------------ */
/**
* Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads
*/
@Override // GemStoneAddition
public void start() throws Exception {
timer=stack.timer;
if(timer == null)
throw new Exception("timer is null");
if(enable_diagnostics) {
diag_handler=new DiagnosticsHandler();
diag_handler.start();
}
if(use_incoming_packet_handler) {
incoming_packet_queue=new com.gemstone.org.jgroups.util.Queue();
incoming_packet_handler=new IncomingPacketHandler();
incoming_packet_handler.start();
}
if(loopback) {
incoming_msg_queue=new com.gemstone.org.jgroups.util.Queue();
incoming_msg_handler=new IncomingMessageHandler();
incoming_msg_handler.start();
}
if(use_outgoing_packet_handler) {
outgoing_queue=new BoundedLinkedQueue(outgoing_queue_max_size);
outgoing_packet_handler=new OutgoingPacketHandler();
outgoing_packet_handler.start();
}
if(enable_bundling) {
bundler=new Bundler();
}
passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
}
@Override // GemStoneAddition
public void stop() {
if(diag_handler != null) {
diag_handler.stop();
diag_handler=null;
}
// 1. Stop the outgoing packet handler thread
if(outgoing_packet_handler != null)
outgoing_packet_handler.stop();
// 2. Stop the incoming packet handler thread
if(incoming_packet_handler != null)
incoming_packet_handler.stop();
// 3. Finally stop the incoming message handler
if(incoming_msg_handler != null)
incoming_msg_handler.stop();
}
/**
* Setup the Protocol instance according to the configuration string
* @return true if no other properties are left.
* false if the properties still have data in them, ie ,
* properties are left over and not handled by the protocol stack
*/
@Override // GemStoneAddition
public boolean setProperties(Properties props) {
String str;
String tmp = null;
super.setProperties(props);
// PropertyPermission not granted if running in an untrusted environment with JNLP.
try {
tmp=System.getProperty("bind.address");
if(Util.isBindAddressPropertyIgnored()) {
tmp=null;
}
}
catch (SecurityException ex){
}
if(tmp != null)
str=tmp;
else
str=props.getProperty("bind_addr");
if(str != null) {
try {
bind_addr=InetAddress.getByName(str);
}
catch(UnknownHostException unknown) {
if(log.isFatalEnabled()) log.fatal("(bind_addr): host " + str + " not known");
return false;
}
props.remove("bind_addr");
}
str=props.getProperty("use_local_host");
if(str != null) {
// use_local_host=Boolean.valueOf/*GemStoneAddition*/(str).booleanValue(); GemStoneAddition(omitted)
props.remove("use_local_host");
}
str=props.getProperty("bind_to_all_interfaces");
if(str != null) {
receive_on_all_interfaces=Boolean.valueOf/*GemStoneAddition*/(str).booleanValue();
props.remove("bind_to_all_interfaces");
log.warn("bind_to_all_interfaces has been deprecated; use receive_on_all_interfaces instead");
}
str=props.getProperty("receive_on_all_interfaces");
if(str != null) {
receive_on_all_interfaces=Boolean.valueOf/*GemStoneAddition*/(str).booleanValue();
props.remove("receive_on_all_interfaces");
}
str=props.getProperty("receive_interfaces");
if(str != null) {
try {
receive_interfaces=parseInterfaceList(str);
props.remove("receive_interfaces");
}
catch(Exception e) {
log.error(ExternalStrings.TP_ERROR_DETERMINING_INTERFACES__0_, str, e);
return false;
}
}
str=props.getProperty("send_on_all_interfaces");
if(str != null) {
send_on_all_interfaces=Boolean.valueOf/*GemStoneAddition*/(str).booleanValue();
props.remove("send_on_all_interfaces");
}
str=props.getProperty("send_interfaces");
if(str != null) {
try {
send_interfaces=parseInterfaceList(str);
props.remove("send_interfaces");
}
catch(Exception e) {
log.error(ExternalStrings.TP_ERROR_DETERMINING_INTERFACES__0_, str, e);
return false;
}
}
str=props.getProperty("bind_port");
if(str != null) {
bind_port=Integer.parseInt(str);
props.remove("bind_port");
}
str=props.getProperty("port_range");
if(str != null) {
port_range=Integer.parseInt(str);
props.remove("port_range");
}
str=props.getProperty("loopback");
if(str != null) {
loopback=Boolean.valueOf(str).booleanValue();
props.remove("loopback");
}
str=props.getProperty("discard_incompatible_packets");
if(str != null) {
discard_incompatible_packets=Boolean.valueOf(str).booleanValue();
props.remove("discard_incompatible_packets");
}
// this is deprecated, just left for compatibility (use use_incoming_packet_handler)
str=props.getProperty("use_packet_handler");
if(str != null) {
use_incoming_packet_handler=Boolean.valueOf(str).booleanValue();
props.remove("use_packet_handler");
if(warn) log.warn("'use_packet_handler' is deprecated; use 'use_incoming_packet_handler' instead");
}
str=props.getProperty("use_incoming_packet_handler");
if(str != null) {
use_incoming_packet_handler=Boolean.valueOf(str).booleanValue();
props.remove("use_incoming_packet_handler");
}
str=props.getProperty("use_outgoing_packet_handler");
if(str != null) {
use_outgoing_packet_handler=Boolean.valueOf(str).booleanValue();
props.remove("use_outgoing_packet_handler");
}
str=props.getProperty("outgoing_queue_max_size");
if(str != null) {
outgoing_queue_max_size=Integer.parseInt(str);
props.remove("outgoing_queue_max_size");
if(outgoing_queue_max_size <= 0) {
if(log.isWarnEnabled())
log.warn("outgoing_queue_max_size of " + outgoing_queue_max_size + " is invalid, setting it to 1");
outgoing_queue_max_size=1;
}
}
str=props.getProperty("max_bundle_size");
if(str != null) {
int bundle_size=Integer.parseInt(str);
if(bundle_size > max_bundle_size) {
// GemStoneAddition - don't prevent stack from starting if this happens
if(log.isWarnEnabled()) log.warn("auto sensed max datagram size (" + bundle_size +
") is greater than udp_fragment_size setting plus overhead (" + max_bundle_size + ')');
//return false;
}
if(bundle_size <= 0) {
if(log.isErrorEnabled()) log.error(ExternalStrings.TP_MAX_BUNDLE_SIZE__0__IS__0, bundle_size);
return false;
}
max_bundle_size=bundle_size;
props.remove("max_bundle_size");
}
str=props.getProperty("max_bundle_timeout");
if(str != null) {
max_bundle_timeout=Long.parseLong(str);
if(max_bundle_timeout <= 0) {
if(log.isErrorEnabled()) log.error(ExternalStrings.TP_MAX_BUNDLE_TIMEOUT_OF__0__IS_INVALID, max_bundle_timeout);
return false;
}
props.remove("max_bundle_timeout");
}
str=props.getProperty("enable_bundling");
if(str != null) {
enable_bundling=Boolean.valueOf(str).booleanValue();
props.remove("enable_bundling");
}
str=props.getProperty("use_addr_translation");
if(str != null) {
// use_addr_translation=Boolean.valueOf(str).booleanValue(); GemStoneAddition
props.remove("use_addr_translation");
}
str=props.getProperty("enable_diagnostics");
if(str != null) {
enable_diagnostics=Boolean.valueOf(str).booleanValue();
props.remove("enable_diagnostics");
}
str=props.getProperty("diagnostics_addr");
if(str != null) {
diagnostics_addr=str;
props.remove("diagnostics_addr");
}
str=props.getProperty("diagnostics_port");
if(str != null) {
diagnostics_port=Integer.parseInt(str);
props.remove("diagnostics_port");
}
if(enable_bundling) {
//if (use_outgoing_packet_handler == false)
// if(warn) log.warn("enable_bundling is true; setting use_outgoing_packet_handler=true");
// use_outgoing_packet_handler=true;
}
return true;
}
/**
* This prevents the up-handler thread to be created, which essentially is superfluous:
* messages are received from the network rather than from a layer below.
* DON'T REMOVE !
*/
@Override // GemStoneAddition
public void startUpHandler() {
}
/**
* handle the UP event.
* @param evt - the event being send from the stack
*/
@Override // GemStoneAddition
public void up(Event evt) {
switch(evt.getType()) {
case Event.CONFIG:
passUp(evt);
if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
handleConfigEvent((HashMap)evt.getArg());
return;
}
passUp(evt);
}
/**
* Caller by the layer above this layer. Usually we just put this Message
* into the send queue and let one or more worker threads handle it. A worker thread
* then removes the Message from the send queue, performs a conversion and adds the
* modified Message to the send queue of the layer below it, by calling down()).
*/
@Override // GemStoneAddition
public void down(Event evt) {
if(evt.getType() != Event.MSG) { // unless it is a message handle it and respond
handleDownEvent(evt);
return;
}
// GemStoneAddition - do not send messages if this thread has been interrupted.
// It is quite possible that the message content is corrupt
if (Thread.currentThread().isInterrupted()) {
return;
}
Message msg=(Message)evt.getArg();
if(header != null) {
// added patch by Roland Kurmann (March 20 2003)
// msg.putHeader(name, new TpHeader(channel_name));
msg.putHeader(name, header);
}
// Because we don't call Protocol.passDown(), we notify the observer directly (e.g. PerfObserver).
// This way, we still have performance numbers for TP
if(observer != null)
observer.passDown(evt);
setSourceAddress(msg); // very important !! listToBuffer() will fail with a null src address !!
if (msg.bundleable && msg.isHighPriority) {
msg.bundleable = false;
}
if (VERBOSE || GemFireTracer.DEBUG) {
StringBuffer sb=new StringBuffer("sending msg ");
sb.append(msg.toString());
if (!msg.isHighPriority && enable_bundling && msg.bundleable) {
sb.append(" bundled");
}
log.getLogWriter().info(ExternalStrings.DEBUG, sb);
}
// Don't send if destination is local address. Instead, switch dst and src and put in up_queue.
// If multicast message, loopback a copy directly to us (but still multicast). Once we receive this,
// we will discard our own multicast message
Address dest=msg.getDest();
boolean multicast=dest == null || dest.isMulticastAddress();
if(loopback && (multicast || dest.equals(local_addr))) {
Message copy=msg.copy(true);
// copy.removeHeader(name); // we don't remove the header
copy.setSrc(local_addr);
// copy.setDest(dest);
if(trace) log.trace("looping back message"); // GemStoneAddition - don't log message again
try {
incoming_msg_queue.add(copy);
stack.gfPeerFunctions.setJgQueuedMessagesSize(incoming_msg_queue.size());
}
catch(QueueClosedException e) {
// if (trace) log.error("failed adding looped back message to incoming_msg_queue", e);
}
if(!multicast)
return;
}
try {
if(use_outgoing_packet_handler)
outgoing_queue.put(msg);
else
send(msg, dest, multicast);
}
catch(QueueClosedException closed_ex) {
}
catch(InterruptedException interruptedEx) {
Thread.currentThread().interrupt(); // GemStoneAddition
}
catch (VirtualMachineError err) { // GemStoneAddition
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch (RetransmissionTooLargeException e) {
throw e;
}
catch(Throwable e) {
// GemStoneAddition - look for out-of-buffer-space condition
if ( (e instanceof Exception) && (e.getCause() != null) &&
"No buffer space available".equals(e.getCause().getMessage())) {
log.getLogWriter().warning(
ExternalStrings.TP_OUT_OF_SOCKET_BUFFER_SPACE_INCREASE_0,
(multicast? " mcast-send-buffer-size"
: " udp-send-buffer-size"));
}
//if(log.isErrorEnabled() &&
if (!Thread.currentThread().isInterrupted() // GemStoneAddition - don't log interrupts
) {
if (VERBOSE || GemFireTracer.DEBUG) {
log.getLogWriter().fine("failed sending message", e);
}
}
}
}
/*--------------------------- End of Protocol interface -------------------------- */
/* ------------------------------ Private Methods -------------------------------- */
/**
* If the sender is null, set our own address. We cannot just go ahead and set the address
* anyway, as we might be sending a message on behalf of someone else ! E.gin case of
* retransmission, when the original sender has crashed, or in a FLUSH protocol when we
* have to return all unstable messages with the FLUSH_OK response.
*/
private void setSourceAddress(Message msg) {
if(msg.getSrc() == null)
msg.setSrc(local_addr);
}
/** GemStoneAddition
* this sends a multicast message and waits for the given period of time
* for a response
* @param timeout
* @return true if a response was received
*/
public boolean testMulticast(long timeout) {
byte[] buffer = new byte[4];
buffer[0] = (byte)'p';
buffer[1] = (byte)'i';
buffer[2] = (byte)'n';
buffer[3] = (byte)'g';
this.pongReceived = false;
try {
this.sendToAllMembers(buffer, 0, 4);
} catch (Exception e) {
if (log.getLogWriter().fineEnabled()) {
log.getLogWriter().fine("exception sending ping request", e);
}
return false;
}
long waitEnd = System.currentTimeMillis() + timeout;
long remainingMs = timeout;
synchronized(this.pongSync ) {
try {
while (!this.pongReceived && remainingMs > 0) {
this.pongSync.wait(remainingMs);
remainingMs = waitEnd - System.currentTimeMillis();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return this.pongReceived;
}
}
/**
* Subclasses must call this method when a unicast or multicast message has been received.
* Declared final so subclasses cannot override this method.
*
* @param dest
* @param sender
* @param data
* @param offset
* @param length
*/
protected final void receive(Address dest, Address sender, byte[] data, int offset, int length) {
if(data == null) return;
// if(length == 4) { // received a diagnostics probe
// if(data[offset] == 'd' && data[offset+1] == 'i' && data[offset+2] == 'a' && data[offset+3] == 'g') {
// handleDiagnosticProbe(sender);
// return;
// }
// }
// GemStoneAddition - record time until msg reaches jgmm
long start = 0;
if (stack != null && stack.enableClockStats)
start = nanoTime();
boolean mcast=dest == null || dest.isMulticastAddress();
// if(trace){
// StringBuffer sb=new StringBuffer("received (");
// sb.append(mcast? "mcast)" : "ucast) ").append(length).append(" bytes from ").append(sender);
// log.trace(sb.toString());
// }
// GemStoneAddition - AvailablePort support
if (length == 4) {
if (data[offset] == 'p' && data[offset+1] == 'i' && data[offset+2] == 'n' && data[offset+3] == 'g') {
sendPingResponse(sender);
return;
} else if (data[offset] == 'p' && data[offset+1] == 'o' && data[offset+2] == 'n' && data[offset+3] == 'g') {
synchronized(this.pongSync) {
this.pongReceived = true;
this.pongSync.notifyAll();
}
this.stack.gfPeerFunctions.pongReceived(
((IpAddress)sender).getSocketAddress());
return;
}
}
try {
if(use_incoming_packet_handler) {
byte[] tmp=new byte[length];
System.arraycopy(data, offset, tmp, 0, length);
incoming_packet_queue.add(new IncomingQueueEntry(dest, sender, tmp, offset, length, start)); // GemStoneAddition - pass along start time
}
else
handleIncomingPacket(dest, sender, data, offset, length, start); // GemStoneAddition - pass along start time
}
catch (VirtualMachineError err) { // GemStoneAddition
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch(Throwable t) {
if(log.isErrorEnabled())
log.error(ExternalStrings.TP_FAILED_HANDLING_DATA_FROM_0, sender, t);
}
}
/**
* Processes a packet read from either the multicast or unicast socket. Needs to be synchronized because
* mcast or unicast socket reads can be concurrent.
* Correction (bela April 19 2005): we access no instance variables, all vars are allocated on the stack, so
* this method should be reentrant: removed 'synchronized' keyword
*/
protected/*GemStoneAddition*/ void handleIncomingPacket(Address dest, Address sender, byte[] data, int offset, int length,
long startTime /* GemStoneAddition */) {
Message msg=null;
List l=null; // used if bundling is enabled
short version;
boolean is_message_list, multicast;
byte flags;
try {
// System.out.println("handleIncomingPacket("+dest+", "+sender+","+data.length+","+offset+")");
synchronized(in_stream) {
in_stream.setData(data, offset, length);
buf_in_stream.reset(length);
version=dis.readShort();
if(JGroupsVersion.compareTo(version) == false) {
if(warn) {
StringBuffer sb=new StringBuffer();
sb.append("packet from ").append(sender).append(" has different version (").append(version);
sb.append(") from ours (").append(JGroupsVersion.printVersion()).append("). ");
if(discard_incompatible_packets)
sb.append("Packet is discarded");
else
sb.append("This may cause problems");
log.warn(sb);
}
if(discard_incompatible_packets)
return;
}
flags=dis.readByte();
is_message_list=(flags & LIST) == LIST;
multicast=(flags & MULTICAST) == MULTICAST;
if(is_message_list) {
l=bufferToList(dis, dest, sender, multicast);
if (l == null) {
return;
}
}
else {
msg=bufferToMessage(dis, dest, sender, multicast);
if (msg == null) {
return;
}
msg.timeStamp = startTime; // GemStoneAddition
}
}
LinkedList msgs=new LinkedList();
if(is_message_list) {
for(Enumeration en=l.elements(); en.hasMoreElements();) {
Message m = (Message)en.nextElement(); // GemStoneAddition
// we use current time instead of startTime
// so that passUp() with no up-threads
// doesn't artificially inflate the channel time for
// bundled messages
if (stack.enableClockStats)
m.timeStamp = nanoTime();
msgs.add(m);
}
}
else
msgs.add(msg);
Address src;
for(Iterator it=msgs.iterator(); it.hasNext();) {
msg=(Message)it.next();
src=msg.getSrc();
// GemStoneAddition - this loop was very poorly written & has been recoded
if (loopback
&& multicast && src != null && local_addr.equals(src)) {
if (trace) {
log.getLogWriter().info(ExternalStrings.DEBUG, "discarding my own multicast message " + msg);
}
it.remove();
continue;
}
else {
// if (incoming_msg_queue == null || msg.isHighPriority) {
handleIncomingMessage(msg);
it.remove();
// }
}
}
// GemStoneAddition - we want to use loopback but not queue all
// messages coming over the wire. Non-loopback messages should
// just be dispatched
// if(incoming_msg_queue != null && msgs.size() > 0) {
// incoming_msg_queue.addAll(msgs);
// if (stack.gemfireStats != null) {
// stack.gemfireStats.setJgQueuedMessagesSize(incoming_msg_queue.size());
// }
// }
}
catch(QueueClosedException closed_ex) {
; // swallow exception
}
catch (VirtualMachineError err) { // GemStoneAddition
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch(Throwable t) {
// if(log.isErrorEnabled()) {
log.getLogWriter().info(ExternalStrings.TP_FAILED_UNMARSHALLING_MESSAGE_FROM__0,
sender, t);
// }
}
}
protected/*GemStoneAddition*/ void handleIncomingMessage(Message msg) {
Event evt;
TpHeader hdr;
if(stats) {
num_msgs_received++;
num_bytes_received+=msg.getLength();
}
evt=new Event(Event.MSG, msg);
if(VERBOSE || GemFireTracer.DEBUG) {
// GemStoneAddition - headers are always displayed in Message.toString() now
StringBuffer sb=new StringBuffer("received message ").append(msg);; // .append(", headers are ").append(msg.getHeaders());
log.getLogWriter().info(ExternalStrings.DEBUG, sb);
}
/* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
* This allows e.g. PerfObserver to get the time of reception of a message */
if(observer != null)
observer.up(evt, up_queue.size());
hdr=(TpHeader)msg.getHeader(name); // replaced removeHeader() with getHeader()
if(hdr != null) {
/* Discard all messages destined for a channel with a different name */
String ch_name=hdr.channel_name;
// Discard if message's group name is not the same as our group name unless the
// message is a diagnosis message (special group name DIAG_GROUP)
// GemStoneAddition: now we support a different group name by
// transparently adjusting serialization and semantics for older
// versions till the min specified in Version.OLDEST_P2P_SUPPORTED
if(ch_name != null && channel_name != null && !channel_name.equals(ch_name) /* &&
!ch_name.equals(Util.DIAG_GROUP)*/) { // GemStoneAddition - remove a string compare
if(warn)
log.warn(new StringBuilder("discarded message from different group \"").append(ch_name).
append("\" (our group is \"").append(channel_name).append("\"). Sender was ").append(msg.getSrc()));
return;
}
// GemStoneAddition - remove the TP header to reduce footprint in UNICAST/NAKACK receive window
msg.removeHeader(name);
}
else {
if(trace)
log.trace(new StringBuffer("message does not have a transport header, msg is ").append(msg).
append(", headers are ").append(msg.getHeaders()).append(", will be discarded"));
return;
}
// gemfire addition.
long sTime = 0;
if (stack.enableJgStackStats)
sTime = nanoTime();
passUp(evt);
// gemfire addition.
if (stack.enableJgStackStats)
stack.gfPeerFunctions.incjgUpTime(nanoTime() - sTime);
}
/** Internal method to serialize and send a message. This method is not reentrant */
protected/*GemStoneAddition*/ void send(Message msg, Address dest, boolean multicast) throws Exception {
if(enable_bundling) { // GemStoneAddition - bundleable
if (msg.bundleable && multicast) {
bundler.send(msg, dest);
return;
}
//else { [bruce] this is more correct, but has a perf impact. sending out of order doesn't seem
// to cause problems
// flush any bundled messages so we don't have ordering problems
// bundler.bundleAndSend();
//}
}
// if (log.isDebugEnabled()) // GemStoneAddition - for last ditch debugging
// log.debug("sending message unbundled"); // + msg.toString() + " headers: " + msg.printObjectHeaders());
// Needs to be synchronized because we can have possible concurrent access, e.g.
// Discovery uses a separate thread to send out discovery messages
// We would *not* need to sync between send(), OutgoingPacketHandler and BundlingOutgoingPacketHandler,
// because only *one* of them is enabled
Buffer buf;
// synchronized(out_stream) {
buf=messageToBuffer(msg, multicast);
if (buf.getLength() > 60000) {
if (com.gemstone.org.jgroups.protocols.pbcast.NAKACK.isRetransmission(msg)) {
throw new com.gemstone.org.jgroups.protocols.pbcast.NAKACK.RetransmissionTooLargeException("serialized size is " + buf.getLength());
}
}
doSend(buf, msg.isJoinResponse, dest, multicast);
// }
}
protected/*GemStoneAddition*/ void doSend(Buffer buf, boolean isJoinResponse, Address dest, boolean multicast) throws Exception {
if(stats) {
num_msgs_sent++;
num_bytes_sent+=buf.getLength();
}
if(multicast) {
sendToAllMembers(buf.getBuf(), buf.getOffset(), buf.getLength());
}
else {
sendToSingleMember(dest, isJoinResponse, buf.getBuf(), buf.getOffset(), buf.getLength());
}
}
/**
* This method needs to be synchronized on out_stream when it is called
* @param msg
* @return Buffer
* @throws java.io.IOException
*/
private Buffer messageToBuffer(Message msg, boolean multicast) throws Exception {
Buffer retval;
byte flags=0;
// out_stream.reset();
// buf_out_stream.reset(out_stream.getCapacity());
ExposedByteArrayOutputStream out_stream = new ExposedByteArrayOutputStream((int)msg.size());
ExposedBufferedOutputStream buf_out_stream = new ExposedBufferedOutputStream(out_stream, (int)msg.size());
DataOutputStream dos = new DataOutputStream(buf_out_stream); //dos.reset();
dos.writeShort(JGroupsVersion.version); // write the version
short msgVersion = 0;
if(multicast) {
flags+=MULTICAST;
msgVersion = getMulticastVersion();
}
dos.writeByte(flags);
if (multicast) {
JGroupsVersion.writeOrdinal(dos, msgVersion, true);
}
// preMarshalling(msg, dest, src); // allows for optimization by subclass
msg.setVersion(msgVersion);
msg.writeTo(dos);
// postMarshalling(msg, dest, src); // allows for optimization by subclass
dos.flush();
retval=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size());
return retval;
}
protected Message bufferToMessage(DataInputStream instream, Address dest, Address sender, boolean multicast) throws Exception {
short mcastVersion = JChannel.getGfFunctions().getCurrentVersionOrdinal();
if (multicast) {
short mcastOrdinal = JGroupsVersion.readOrdinal(instream);
if (mcastOrdinal > mcastVersion) {
// newer mcast packet than this VM can handle - ignore it
log.getLogWriter().info(ExternalStrings.IGNORING_MULTICAST_MESSAGE_WITH_HIGHER_VERSION_FROM_0_1_2,
new Object[]{sender, mcastOrdinal, mcastVersion});
return null;
}
mcastVersion = mcastOrdinal;
}
Message msg=new Message(false); // don't create headers, readFrom() will do this
msg.setVersion(mcastVersion);
if (log.isDebugEnabled()) {
log.debug("deserializing a message from " + sender);
}
msg.readFrom(instream);
postUnmarshalling(msg, dest, sender, multicast); // allows for optimization by subclass
return msg;
}
protected/*GemStoneAddition*/ Buffer listToBuffer(List l, boolean multicast) throws Exception {
Buffer retval;
// Address src; GemStoneAddition
Message msg;
byte flags=0;
int len=l != null? l.size() : 0;
// boolean src_written=false; GemStoneAddition
// out_stream.reset();
// buf_out_stream.reset(out_stream.getCapacity());
ExposedByteArrayOutputStream out_stream = new ExposedByteArrayOutputStream(65535);
ExposedBufferedOutputStream buf_out_stream = new ExposedBufferedOutputStream(out_stream, 65535);
// dos.reset();
DataOutputStream dos = new DataOutputStream(buf_out_stream);
dos.writeShort(JGroupsVersion.version);
flags+=LIST;
short mcastVersion = 0;
if(multicast) {
flags+=MULTICAST;
}
dos.writeByte(flags);
if (multicast) {
mcastVersion = getMulticastVersion();
// record the version for reading so we know how to deserialize the address
JGroupsVersion.writeOrdinal(dos, mcastVersion, true);
dos = stack.gfBasicFunctions.getVersionedDataOutputStream(dos, mcastVersion);
}
dos.writeInt(len);
boolean writeaddrs = true;
if (local_addr != null) {
writeaddrs=false;
dos.writeBoolean(false);
Util.writeAddress(local_addr, dos);
}
else {
dos.writeBoolean(true);
}
if (l != null) { // GemStoneAddition
for(Enumeration en=l.elements(); en.hasMoreElements();) {
msg=(Message)en.nextElement();
//src=msg.getSrc();
//if(!src_written) {
// Util.writeAddress(src, dos);
// src_written=true;
//}
boolean resurrect = false;
if (!writeaddrs) {
if (msg.getSrc().equals(local_addr)) {
msg.setSrc(null);
resurrect=true;
}
}
if (multicast) {
msg.setVersion(mcastVersion);
}
msg.writeTo(dos);
if (resurrect)
msg.setSrc(local_addr);
}
} // GemStoneAddition
dos.flush();
retval=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size());
return retval;
}
/**
* GemStoneAddition. When mcasting we choose the oldest version in the
* current membership set for serializing messages.
*/
private void determineMulticastVersion() {
short currOrdinal = JChannel.getGfFunctions().getCurrentVersionOrdinal();
short result = currOrdinal;
synchronized(this.members) {
for (Iterator it=this.members.iterator(); it.hasNext(); ) {
short ver = ((Address)it.next()).getVersionOrdinal();
if (ver < currOrdinal) {
if (result == 0) {
result = ver;
} else if (ver < result) {
result = ver;
}
}
}
}
this.multicastVersion = result;
Message.multicastVersion = result;
}
private short getMulticastVersion() {
synchronized(this.members) {
if (this.multicastVersion <= 0) {
return JChannel.getGfFunctions().getCurrentVersionOrdinal();
} else {
return this.multicastVersion;
}
}
}
private List bufferToList(DataInputStream instream, Address dest, Address source, boolean multicast) throws Exception { // GemStoneAddition - source parameter
List l=new List();
DataInputStream in=null;
int len;
Message msg;
Address src = null;
short currentVersionOrdinal = stack.gfBasicFunctions.getCurrentVersionOrdinal();
try {
if (multicast) {
short mcastOrdinal = JGroupsVersion.readOrdinal(instream);
if (mcastOrdinal > currentVersionOrdinal) {
// newer mcast packet than this VM can handle - ignore it
log.getLogWriter().info(ExternalStrings.IGNORING_MULTICAST_MESSAGE_WITH_HIGHER_VERSION_FROM_0_1_2,
new Object[]{source, mcastOrdinal, currentVersionOrdinal});
return null;
}
if (mcastOrdinal < currentVersionOrdinal) {
instream = stack.gfBasicFunctions.getVersionedDataInputStream(instream, mcastOrdinal);
}
}
len=instream.readInt();
boolean readaddr = !instream.readBoolean();
if (readaddr) {
src = Util.readAddress(instream);
// GemStoneAddition - canonical source addresses
synchronized(this.members) {
// GemStoneAddition - canonical member IDs
int idx = this.members.indexOf(src);
if (idx >= 0) {
src = (Address)this.members.get(idx);
}
}
}
//src=Util.readAddress(instream); GemStoneAddition - bug 34280 - messages contain their source address
for(int i=0; i < len; i++) {
msg=new Message(false); // don't create headers, readFrom() will do this
msg.readFrom(instream);
if (readaddr && (msg.getSrc() == null))
msg.setSrc(src);
postUnmarshallingList(msg, dest, multicast);
l.add(msg);
}
return l;
}
finally {
Util.closeInputStream(in);
}
}
/**
*
* @param s
* @return List of NetworkInterface
*/
private java.util.List parseInterfaceList(String s) throws Exception {
java.util.List interfaces=new ArrayList(10);
if(s == null)
return null;
StringTokenizer tok=new StringTokenizer(s, ",");
String interface_name;
NetworkInterface intf;
while(tok.hasMoreTokens()) {
interface_name=tok.nextToken();
// try by name first (e.g. (eth0")
intf=NetworkInterface.getByName(interface_name);
// next try by IP address or symbolic name
if(intf == null)
intf=NetworkInterface.getByInetAddress(InetAddress.getByName(interface_name));
if(intf == null)
throw new Exception("interface " + interface_name + " not found");
if(interfaces.contains(intf)) {
log.warn("did not add interface " + interface_name + " (already present in " + print(interfaces) + ")");
}
else {
interfaces.add(intf);
}
}
return interfaces;
}
private String print(java.util.List interfaces) {
StringBuffer sb=new StringBuffer();
boolean first=true;
NetworkInterface intf;
for(Iterator it=interfaces.iterator(); it.hasNext();) {
intf=(NetworkInterface)it.next();
if(first) {
first=false;
}
else {
sb.append(", ");
}
sb.append(intf.getName());
}
return sb.toString();
}
protected void handleDownEvent(Event evt) {
switch(evt.getType()) {
case Event.TMP_VIEW:
case Event.VIEW_CHANGE:
synchronized(members) {
view=(View)evt.getArg();
members.clear();
Vector tmpvec=view.getMembers();
members.addAll(tmpvec);
determineMulticastVersion();
}
break;
case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local)
passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
break;
case Event.CONNECT:
channel_name=(String)evt.getArg();
header=new TpHeader(channel_name);
// removed March 18 2003 (bela), not needed (handled by GMS)
// changed July 2 2003 (bela): we discard CONNECT_OK at the GMS level anyway, this might
// be needed if we run without GMS though
passUp(new Event(Event.CONNECT_OK));
break;
case Event.CONNECT_OK: // GemStoneAddition - connect_ok handling
// this.connected = true;
break;
case Event.DISCONNECT:
passUp(new Event(Event.DISCONNECT_OK));
break;
case Event.CONFIG:
if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
handleConfigEvent((HashMap)evt.getArg());
break;
}
}
protected void handleConfigEvent(HashMap map) {
if(map == null) return;
if(map.containsKey("additional_data"))
additional_data=(byte[])map.get("additional_data");
}
/* ----------------------------- End of Private Methods ---------------------------------------- */
/* ----------------------------- Inner Classes ---------------------------------------- */
static/*GemStoneAddition*/ class IncomingQueueEntry {
Address dest=null;
Address sender=null;
byte[] buf;
int offset, length;
long timeStamp; // GemStoneAddition
IncomingQueueEntry(Address dest, Address sender, byte[] buf, int offset, int length, long timeStamp /* GemStoneAddition */) {
this.dest=dest;
this.sender=sender;
this.buf=buf;
this.offset=offset;
this.length=length;
this.timeStamp = timeStamp; // GemStoneAddition
}
}
/**
* This thread fetches byte buffers from the packet_queue, converts them into messages and passes them up
* to the higher layer (done in handleIncomingUdpPacket()).
*/
class IncomingPacketHandler implements Runnable {
// GemStoneAddition #t must be synchronized on this
Thread t=null;
synchronized /* GemStoneAddition */ void start() {
if(t == null || !t.isAlive()) {
t=new Thread(this, "UDP Incoming Packet Handler"); // GemStoneAddition - prefix with "UDP" for DM.isPreciousThread()
t.setDaemon(true);
t.start();
}
}
synchronized /* GemStoneAddition */ void stop() {
incoming_packet_queue.close(true); // should terminate the packet_handler thread too
if (t != null)
t.interrupt(); // GemStoneAddition
t=null;
}
public void run() {
IncomingQueueEntry entry;
for (;;) { // GemStoneAddition - remove coding anti-pattern
if (incoming_packet_queue.closed()) break; // GemStoneAddition
if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition -- for safety
try {
entry=(IncomingQueueEntry)incoming_packet_queue.remove();
handleIncomingPacket(entry.dest, entry.sender, entry.buf, entry.offset, entry.length,
entry.timeStamp); // GemStoneAddition - timestamp
}
// GemStoneAddition - handle interrupts by exiting
catch (InterruptedException ie) {
if (log.isTraceEnabled()) log.trace("packet handler thread terminating (interrupted)");
// Thread.currentThread().interrupt(); not really necessary...
break;
}
catch(QueueClosedException closed_ex) {
break;
}
catch (VirtualMachineError err) { // GemStoneAddition
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch(Throwable ex) {
if(log.isErrorEnabled())
log.error(ExternalStrings.TP_ERROR_PROCESSING_INCOMING_PACKET, ex);
}
}
if(trace) log.trace("incoming packet handler terminating");
}
}
class IncomingMessageHandler implements Runnable {
Thread t; // GemStoneAddition -- accesses synchronized on this
// int i=0; GemStoneAddition
synchronized /* GemStoneAddition */ public void start() {
if(t == null || !t.isAlive()) {
t=new Thread(this, "UDP Loopback Message Handler");
t.setDaemon(true);
t.setPriority(Thread.MAX_PRIORITY);
t.start();
}
}
synchronized /* GemStoneAddition */ public void stop() {
// GemStoneAddition - bug #40633, OOME during shutdown due to this queue
// not draining
incoming_msg_queue.close(false);
if (t != null) t.interrupt(); // GemStoneAddition
t=null;
}
public void run() {
Message msg;
for (;;) { // GemStoneAddition - avoid coding anti-pattern
if (incoming_msg_queue.closed()) break; // GemStoneAddition
if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
try {
msg=(Message)incoming_msg_queue.remove();
// if (trace) log.trace("removed message from loopback queue: " + msg);
handleIncomingMessage(msg);
}
catch (InterruptedException ie) { // GemStoneAddition
break; // exit loop and thread
}
catch(QueueClosedException closed_ex) {
break;
}
catch (VirtualMachineError err) { // GemStoneAddition
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch(Throwable ex) {
if(log.isErrorEnabled())
log.error(ExternalStrings.TP_ERROR_PROCESSING_INCOMING_MESSAGE, ex);
}
}
if(trace) log.trace("incoming message handler terminating");
}
}
/**
* This thread fetches byte buffers from the outgoing_packet_queue, converts them into messages and sends them
* using the unicast or multicast socket
*/
class OutgoingPacketHandler implements Runnable {
// GemStoneAddition #t must be synchronized on this
Thread t=null;
// byte[] buf; GemStoneAddition
// DatagramPacket packet; GemStoneAddition
synchronized /* GemStoneAddition */ void start() {
if(t == null || !t.isAlive()) {
t=new Thread(this, "OutgoingPacketHandler");
t.setDaemon(true);
t.start();
}
}
synchronized /* GemStoneAddition*/ void stop() {
Thread tmp=t;
t=null;
if(tmp != null) {
tmp.interrupt();
}
}
public void run() {
Message msg;
boolean multicast = false; // GemStoneAddition
for (;;) { // GemStoneAddition remove anti-pattern
if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
try {
msg=(Message)outgoing_queue.take();
multicast = msg.getDest() == null || msg.getDest().isMulticastAddress(); // GemStoneAddition
handleMessage(msg);
}
catch(QueueClosedException closed_ex) {
break;
}
catch(InterruptedException interruptedEx) {
break; // GemStoneAddition; exit loop and thread
}
catch (VirtualMachineError err) { // GemStoneAddition
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch(Throwable th) {
// GemStoneAddition - look for out-of-buffer-space condition
if ( (th instanceof Exception) && (th.getCause() != null) &&
"No buffer space available".equals(th.getCause().getMessage())) {
log.getLogWriter().warning(
ExternalStrings.TP_OUT_OF_SOCKET_BUFFER_SPACE_INCREASE_0,
(multicast? " mcast-send-buffer-size"
: " udp-send-buffer-size"));
}
else {
if(log.isErrorEnabled()) log.error(ExternalStrings.TP_EXCEPTION_SENDING_PACKET, th);
}
}
msg=null; // let's give the garbage collector a hand... this is probably useless though
}
if(trace) log.trace("outgoing message handler terminating");
}
protected void handleMessage(Message msg) throws Throwable {
Address dest=msg.getDest();
send(msg, dest, dest == null || dest.isMulticastAddress());
}
}
protected/*GemStoneAddition*/ class Bundler {
/** HashMap key=Address, value=List of Message. Keys are destinations, values are lists of Messages */
final HashMap msgs=new HashMap(36);
long count=0; // current number of bytes accumulated
int num_msgs=0;
long start=0;
BundlingTimer bundling_timer=null;
protected/*GemStoneAddition*/ synchronized void send(Message msg, Address dest) throws Exception {
long gsstart = 0;
if (stack.enableClockStats)
gsstart = nanoTime(); // GemStoneAddition - statistics
try {
long length=msg.size();
//if (trace)
// log.trace("bundling " + msg.toString() + " headers: " + msg.printObjectHeaders());
//if (length > max_bundle_size) { // GemStone - for debugging
// msg.dumpPayload();
// log.trace("bundler: bad message size " + msg);
//}
if (checkLength(length)) { // GemStoneAddition - dump msg if too long
log.getLogWriter().warning(ExternalStrings.TP_SENDING_OVERSIZED_MESSAGE__0__HEADERS__1, new Object[] {msg.toString(), msg.printObjectHeaders()});
log.getLogWriter().warning(ExternalStrings.TP_PAYLOAD___0, msg.toStringAsObject());
}
//if(start == 0)
// start=System.currentTimeMillis(); // GemStoneAddition - this isn't used
if(count + length >= max_bundle_size) {
cancelTimer();
bundleAndSend(); // clears msgs and resets num_msgs
}
addMessage(msg, dest);
count+=length;
startTimer(); // start timer if not running
}
finally { // GemStoneAddition
if (stack.enableClockStats) {
if (log.isDebugEnabled()) // GemStoneAddition - debugging
log.debug("bundled " + msg.toString() + " headers: " + msg.printObjectHeaders());
if (stack.enableClockStats)
stack.gfPeerFunctions.incBatchSendTime(gsstart);
}
}
}
/** Never called concurrently with cancelTimer - no need for synchronization */
private void startTimer() {
if(bundling_timer == null || bundling_timer.cancelled()) {
bundling_timer=new BundlingTimer();
timer.add(bundling_timer);
}
}
/** Never called concurrently with startTimer() - no need for synchronization */
private void cancelTimer() {
if(bundling_timer != null) {
bundling_timer.cancel();
bundling_timer=null;
}
}
private void addMessage(Message msg, Address dest) { // no sync needed, never called by multiple threads concurrently
List tmp;
synchronized(msgs) {
tmp=(List)msgs.get(dest);
if(tmp == null) {
tmp=new List();
msgs.put(dest, tmp);
}
tmp.add(msg);
num_msgs++;
}
}
public void bundleAndSend() { // GemStoneAddition - was private
Map.Entry entry;
Address dst;
Buffer buffer;
List l;
synchronized(msgs) {
if(msgs.size() == 0)
return;
try {
if(trace) {
long stop=System.currentTimeMillis();
double percentage=100.0 / max_bundle_size * count;
StringBuffer sb=new StringBuffer("sending ").append(num_msgs).append(" msgs (");
sb.append(count).append(" bytes (" + f.format(percentage) + "% of max_bundle_size), collected in "+
+ (stop-start) + "ms) to ").append(msgs.size()).
append(" destination(s)");
if(msgs.size() > 1) sb.append(" (dests=").append(msgs.keySet()).append(")");
log.trace(sb.toString());
}
boolean multicast;
for(Iterator it=msgs.entrySet().iterator(); it.hasNext();) {
entry=(Map.Entry)it.next();
l=(List)entry.getValue();
if(l.size() == 0)
continue;
dst=(Address)entry.getKey();
multicast=dst == null || dst.isMulticastAddress();
// synchronized(out_stream) {
try {
// GemStoneAddition - stats
// long start = 0;
if (stack.enableClockStats)
start = nanoTime();
buffer=listToBuffer(l, multicast);
if (stack.enableClockStats) {
stack.gfPeerFunctions.incBatchCopyTime(start);
start = nanoTime();
}
doSend(buffer, false, dst, multicast);
if (stack.enableClockStats)
stack.gfPeerFunctions.incBatchFlushTime(start);
}
catch (VirtualMachineError err) { // GemStoneAddition
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch(Throwable e) {
// GemStoneAddition - look for out-of-buffer-space condition
if ( (e instanceof Exception) && (e.getCause() != null) &&
"No buffer space available".equals(e.getCause().getMessage())) {
log.getLogWriter().warning(
ExternalStrings.TP_OUT_OF_SOCKET_BUFFER_SPACE_INCREASE_0,
(multicast? " mcast-send-buffer-size"
: " udp-send-buffer-size"));
}
else {
if(log.isErrorEnabled()) log.error(ExternalStrings.TP_EXCEPTION_SENDING_MSG, e);
}
}
// }
}
}
finally {
msgs.clear();
num_msgs=0;
start=0;
count=0;
}
}
}
private boolean checkLength(long len) throws Exception { // GemStoneAddition - don't throw exceptions
return (len > max_bundle_size);
// throw new Exception("message size (" + len + ") is greater than max bundling size (" + max_bundle_size +
// "). Set the fragmentation/bundle size in FRAG and TP correctly");
}
protected/*GemStoneAddition*/ class BundlingTimer implements TimeScheduler.Task {
boolean cancelled=false;
void cancel() {
cancelled=true;
}
public boolean cancelled() {
return cancelled;
}
public long nextInterval() {
return max_bundle_timeout;
}
public void run() {
bundleAndSend();
cancelled=true;
}
}
}
private class DiagnosticsHandler implements Runnable {
/**
* GemStoneAddition: volatile reads OK, updates synchronized on this.
*/
volatile Thread t=null;
/**
* GemStoneAddition: volatile reads OK, updates synchronized on this.
*/
volatile MulticastSocket diag_sock=null;
DiagnosticsHandler() {
}
synchronized /* GemStoneAddition */ void start() throws IOException {
diag_sock=new MulticastSocket(diagnostics_port);
java.util.List interfaces=Util.getAllAvailableInterfaces();
bindToInterfaces(interfaces, diag_sock);
if(t == null || !t.isAlive()) {
t=new Thread(this, "DiagnosticsHandler");
t.setDaemon(true);
t.start();
}
}
synchronized /* GemStoneAddition */ void stop() {
if(diag_sock != null)
diag_sock.close();
if (t != null) t.interrupt();
t=null;
}
public void run() {
byte[] buf=new byte[1500]; // MTU on most LANs
DatagramPacket packet;
for (;;) { // GemStoneAddition - avoid coding anti-pattern
if (diag_sock.isClosed()) break; // GemStoneAddition
if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
packet=new DatagramPacket(buf, 0, buf.length);
try {
diag_sock.receive(packet);
handleDiagnosticProbe(packet.getSocketAddress(), diag_sock,
new String(packet.getData(), packet.getOffset(), packet.getLength()));
}
catch(IOException e) {
}
}
}
private void bindToInterfaces(java.util.List interfaces, MulticastSocket s) throws IOException {
SocketAddress group_addr=new InetSocketAddress(diagnostics_addr, diagnostics_port);
for(Iterator it=interfaces.iterator(); it.hasNext();) {
NetworkInterface i=(NetworkInterface)it.next();
try {
s.joinGroup(group_addr, i);
if(trace)
log.trace("joined " + group_addr + " on " + i.getName());
}
catch(IOException e) {
log.warn("failed to join " + group_addr + " on " + i.getName() + ": " + e);
}
}
}
}
}