| /*========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.tcp; |
| |
| import java.io.BufferedInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InterruptedIOException; |
| import java.io.OutputStream; |
| import java.net.ConnectException; |
| import java.net.Inet6Address; |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.net.SocketException; |
| import java.net.SocketTimeoutException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.CancelledKeyException; |
| import java.nio.channels.ClosedChannelException; |
| import java.nio.channels.ClosedSelectorException; |
| import java.nio.channels.SocketChannel; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import com.gemstone.gemfire.CancelException; |
| import com.gemstone.gemfire.SystemFailure; |
| import com.gemstone.gemfire.cache.CacheClosedException; |
| import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; |
| import com.gemstone.gemfire.distributed.internal.ConflationKey; |
| import com.gemstone.gemfire.distributed.internal.DM; |
| import com.gemstone.gemfire.distributed.internal.DMStats; |
| import com.gemstone.gemfire.distributed.internal.DirectReplyProcessor; |
| import com.gemstone.gemfire.distributed.internal.DistributionConfig; |
| import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl; |
| import com.gemstone.gemfire.distributed.internal.DistributionManager; |
| import com.gemstone.gemfire.distributed.internal.DistributionMessage; |
| import com.gemstone.gemfire.distributed.internal.DistributionStats; |
| import com.gemstone.gemfire.distributed.internal.ReplyException; |
| import com.gemstone.gemfire.distributed.internal.ReplyMessage; |
| import com.gemstone.gemfire.distributed.internal.ReplyProcessor21; |
| import com.gemstone.gemfire.distributed.internal.ReplySender; |
| import com.gemstone.gemfire.distributed.internal.direct.DirectChannel; |
| import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; |
| import com.gemstone.gemfire.distributed.internal.membership.MembershipManager; |
| import com.gemstone.gemfire.internal.Assert; |
| import com.gemstone.gemfire.internal.ByteArrayDataInput; |
| import com.gemstone.gemfire.internal.DSFIDFactory; |
| import com.gemstone.gemfire.internal.InternalDataSerializer; |
| import com.gemstone.gemfire.internal.SocketCreator; |
| import com.gemstone.gemfire.internal.SocketUtils; |
| import com.gemstone.gemfire.internal.SystemTimer; |
| import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask; |
| import com.gemstone.gemfire.internal.Version; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; |
| import com.gemstone.gemfire.internal.logging.log4j.AlertAppender; |
| import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; |
| import com.gemstone.gemfire.internal.tcp.MsgReader.Header; |
| import com.gemstone.gemfire.internal.util.concurrent.ReentrantSemaphore; |
| import com.gemstone.org.jgroups.util.StringId; |
| |
| /** <p>Connection is a socket holder that sends and receives serialized |
| message objects. A Connection may be closed to preserve system |
| resources and will automatically be reopened when it's needed.</p> |
| |
| @author Bruce Schuchardt |
| @since 2.0 |
| |
| */ |
| |
| public class Connection implements Runnable { |
| private static final Logger logger = LogService.getLogger(); |
| |
| private static final int INITIAL_CAPACITY = Integer.getInteger("p2p.readerBufferSize", 32768).intValue(); |
| private static int P2P_CONNECT_TIMEOUT; |
| private static boolean IS_P2P_CONNECT_TIMEOUT_INITIALIZED = false; |
| |
| public final static int NORMAL_MSG_TYPE = 0x4c; |
| public final static int CHUNKED_MSG_TYPE = 0x4d; // a chunk of one logical msg |
| public final static int END_CHUNKED_MSG_TYPE = 0x4e; // last in a series of chunks |
| public final static int DIRECT_ACK_BIT = 0x20; |
| //We no longer support early ack |
| //public final static int EARLY_ACK_BIT = 0x10; |
| |
| public static final int MSG_HEADER_SIZE_OFFSET = 0; |
| public static final int MSG_HEADER_TYPE_OFFSET = 4; |
| public static final int MSG_HEADER_ID_OFFSET = 5; |
| public static final int MSG_HEADER_BYTES = 7; |
| |
| /** |
| * Small buffer used for send socket buffer on receiver connections |
| * and receive buffer on sender connections. |
| */ |
| public final static int SMALL_BUFFER_SIZE = Integer.getInteger("gemfire.SMALL_BUFFER_SIZE",4096).intValue(); |
| |
| /** counter to give connections a unique id */ |
| private static AtomicLong idCounter = new AtomicLong(1); |
| |
| /** the table holding this connection */ |
| final ConnectionTable owner; |
| |
| /** true if connection is a shared resource that can be used by more than one thread */ |
| private boolean sharedResource; |
| |
| public final boolean isSharedResource() { |
| return this.sharedResource; |
| } |
| |
| /** The idle timeout timer task for this connection */ |
| private SystemTimerTask idleTask; |
| |
| /** |
| * Returns the depth of unshared reader threads from this thread to |
| * the original non-reader-thread. |
| * E.g., ServerConnection -> reader(domino=1) -> reader(domino=2) -> reader(domino=3) |
| */ |
| public static int getDominoCount() { |
| return dominoCount.get().intValue(); |
| } |
| |
| private final static ThreadLocal isReaderThread = new ThreadLocal(); |
| // return true if this thread is a reader thread |
| public final static void makeReaderThread() { |
| // mark this thread as a reader thread |
| isReaderThread.set(Boolean.TRUE); |
| } |
| public final static boolean isReaderThread() { |
| Object o = isReaderThread.get(); |
| if (o == null) { |
| return false; |
| } else { |
| return ((Boolean)o).booleanValue(); |
| } |
| } |
| |
| private int getP2PConnectTimeout() { |
| if(IS_P2P_CONNECT_TIMEOUT_INITIALIZED) |
| return P2P_CONNECT_TIMEOUT; |
| String connectTimeoutStr = System.getProperty("p2p.connectTimeout"); |
| if(connectTimeoutStr != null) { |
| P2P_CONNECT_TIMEOUT = Integer.parseInt(connectTimeoutStr); |
| } |
| else { |
| P2P_CONNECT_TIMEOUT = 6*this.owner.owner.getDM().getConfig().getMemberTimeout(); |
| } |
| IS_P2P_CONNECT_TIMEOUT_INITIALIZED = true; |
| return P2P_CONNECT_TIMEOUT; |
| } |
| /** |
| * If true then readers for thread owned sockets will send all messages on thread owned senders. |
| * Even normally unordered msgs get send on TO socks. |
| */ |
| private static final boolean DOMINO_THREAD_OWNED_SOCKETS = Boolean.getBoolean("p2p.ENABLE_DOMINO_THREAD_OWNED_SOCKETS"); |
| private final static ThreadLocal isDominoThread = new ThreadLocal(); |
| // return true if this thread is a reader thread |
| public final static boolean tipDomino() { |
| if (DOMINO_THREAD_OWNED_SOCKETS) { |
| // mark this thread as one who wants to send ALL on TO sockets |
| ConnectionTable.threadWantsOwnResources(); |
| isDominoThread.set(Boolean.TRUE); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| public final static boolean isDominoThread() { |
| Object o = isDominoThread.get(); |
| if (o == null) { |
| return false; |
| } else { |
| return ((Boolean)o).booleanValue(); |
| } |
| } |
| |
| /** the socket entrusted to this connection */ |
| private final Socket socket; |
| |
| /** the non-NIO output stream */ |
| OutputStream output; |
| |
| /** output stream/channel lock */ |
| private final Object outLock = new Object(); |
| |
| /** the ID string of the conduit (for logging) */ |
| String conduitIdStr; |
| |
| /** remoteId identifies the remote conduit's listener. It does NOT |
| identify the "port" that this connection's socket is attached |
| to, which is a different thing altogether */ |
| Stub remoteId; |
| |
| /** Identifies the java group member on the other side of the connection. */ |
| InternalDistributedMember remoteAddr; |
| |
| /** |
| * Identifies the version of the member on the other side of the connection. |
| */ |
| Version remoteVersion; |
| |
| /** |
| * True if this connection was accepted by a listening socket. |
| * This makes it a receiver. |
| * False if this connection was explicitly created by a connect call. |
| * This makes it a sender. |
| */ |
| private final boolean isReceiver; |
| |
| /** |
| * count of how many unshared p2p-readers removed from the original action this |
| * thread is. For instance, server-connection -> owned p2p reader (count 0) |
| * -> owned p2p reader (count 1) -> owned p2p reader (count 2). This shows |
| * up in thread names as "DOM #x" (domino #x) |
| */ |
| private static ThreadLocal<Integer> dominoCount = new ThreadLocal<Integer>() { |
| @Override |
| protected Integer initialValue() { |
| return 0; |
| }}; |
| |
| // /** |
| // * name of sender thread thread. Useful in finding out why a reader |
| // * thread was created. Add sending of the name in handshakes and |
| // * add it to the name of the reader thread (the code is there but commented out) |
| // */ |
| // private String senderName = null; |
| |
| // If we are a sender then we want to know if the receiver on the |
| // other end is willing to have its messages queued. The following |
| // four "async" inst vars come from his handshake response. |
| /** |
| * How long to wait if receiver will not accept a message before we |
| * go into queue mode. |
| * @since 4.2.2 |
| */ |
| private int asyncDistributionTimeout = 0; |
| /** |
| * How long to wait, |
| * with the receiver not accepting any messages, |
| * before kicking the receiver out of the distributed system. |
| * Ignored if asyncDistributionTimeout is zero. |
| * @since 4.2.2 |
| */ |
| private int asyncQueueTimeout = 0; |
| /** |
| * How much queued data we can have, |
| * with the receiver not accepting any messages, |
| * before kicking the receiver out of the distributed system. |
| * Ignored if asyncDistributionTimeout is zero. |
| * Canonicalized to bytes (property file has it as megabytes |
| * @since 4.2.2 |
| */ |
| private long asyncMaxQueueSize = 0; |
| /** |
| * True if an async queue is already being filled. |
| */ |
| private volatile boolean asyncQueuingInProgress = false; |
| |
| /** |
| * Maps ConflatedKey instances to ConflatedKey instance. |
| * Note that even though the key and value for an entry is the map |
| * will always be "equal" they will not always be "==". |
| */ |
| private final Map conflatedKeys = new HashMap(); |
| |
| //private final Queue outgoingQueue = new LinkedBlockingQueue(); |
| |
| |
| // NOTE: LinkedBlockingQueue has a bug in which removes from the queue |
| // cause future offer to increase the size without adding anything to the queue. |
| // So I've changed from this backport class to a java.util.LinkedList |
| private final LinkedList outgoingQueue = new LinkedList(); |
| |
| /** |
| * Number of bytes in the outgoingQueue. |
| * Used to control capacity. |
| */ |
| private long queuedBytes = 0; |
| |
| /** used for async writes */ |
| Thread pusherThread; |
| |
| /** |
| * The maximum number of concurrent senders sending a message to a single recipient. |
| */ |
| private final static int MAX_SENDERS = Integer.getInteger("p2p.maxConnectionSenders", DirectChannel.DEFAULT_CONCURRENCY_LEVEL).intValue(); |
| /** |
| * This semaphore is used to throttle how many threads will try to do sends on |
| * this connection concurrently. A thread must acquire this semaphore before it |
| * is allowed to start serializing its message. |
| */ |
| private final Semaphore senderSem = new ReentrantSemaphore(MAX_SENDERS); |
| |
| /** Set to true once the handshake has been read */ |
| volatile boolean handshakeRead = false; |
| volatile boolean handshakeCancelled = false; |
| |
| private volatile int replyCode = 0; |
| |
| private static final byte REPLY_CODE_OK = (byte)69; |
| private static final byte REPLY_CODE_OK_WITH_ASYNC_INFO = (byte)70; |
| |
| private final Object handshakeSync = new Object(); |
| |
| /** message reader thread */ |
| Thread readerThread; |
| |
| // /** |
| // * When a thread owns the outLock and is writing to the socket, it must |
| // * be placed in this variable so that it can be interrupted should the |
| // * socket need to be closed. |
| // */ |
| // private volatile Thread writerThread; |
| |
| /** whether the reader thread is, or should be, running */ |
| volatile boolean stopped = true; |
| |
| /** set to true once a close begins */ |
| private final AtomicBoolean closing = new AtomicBoolean(false); |
| |
| volatile boolean readerShuttingDown = false; |
| |
| /** whether the socket is connected */ |
| volatile boolean connected = false; |
| |
| /** |
| * Set to true once a connection finishes its constructor |
| */ |
| volatile boolean finishedConnecting = false; |
| |
| volatile boolean accessed = true; |
| volatile boolean socketInUse = false; |
| volatile boolean timedOut = false; |
| |
| /** |
| * task for detecting ack timeouts and issuing alerts |
| */ |
| private SystemTimer.SystemTimerTask ackTimeoutTask; |
| |
| // State for ackTimeoutTask: transmissionStartTime, ackWaitTimeout, ackSATimeout, ackConnectionGroup, ackThreadName |
| |
| /** millisecond clock at the time message transmission started, if doing |
| * forced-disconnect processing */ |
| long transmissionStartTime; |
| |
| /** ack wait timeout - if socketInUse, use this to trigger SUSPECT processing */ |
| private long ackWaitTimeout; |
| |
| /** ack severe alert timeout - if socketInUse, use this to send alert */ |
| private long ackSATimeout; |
| |
| /** |
| * other connections participating in the current transmission. we notify |
| * them if ackSATimeout expires to keep all members from generating alerts |
| * when only one is slow |
| */ |
| List ackConnectionGroup; |
| |
| /** name of thread that we're currently performing an operation in (may be null) */ |
| String ackThreadName; |
| |
| |
| /** the buffer used for NIO message receipt */ |
| ByteBuffer nioInputBuffer; |
| |
| /** the position of the next message's content */ |
| // int nioMessageStart; |
| |
| /** the length of the next message to be dispatched */ |
| int nioMessageLength; |
| // byte nioMessageVersion; |
| |
| /** the type of message being received */ |
| byte nioMessageType; |
| |
| /** used to lock access to destreamer data */ |
| private final Object destreamerLock = new Object(); |
| /** caches a msg destreamer that is currently not being used */ |
| MsgDestreamer idleMsgDestreamer; |
| /** used to map a msgId to a MsgDestreamer which are |
| * used for destreaming chunked messages using nio */ |
| HashMap destreamerMap; |
| |
| boolean directAck; |
| |
| short nioMsgId; |
| |
| /** whether the length of the next message has been established */ |
| boolean nioLengthSet = false; |
| |
| /** is this connection used for serial message delivery? */ |
| boolean preserveOrder = false; |
| |
| /** number of messages sent on this connection */ |
| private long messagesSent; |
| |
| /** number of messages received on this connection */ |
| private long messagesReceived; |
| |
| /** unique ID of this connection (remote if isReceiver==true) */ |
| private volatile long uniqueId; |
| |
| private int sendBufferSize = -1; |
| private int recvBufferSize = -1; |
| |
| private ReplySender replySender; |
| |
| private void setSendBufferSize(Socket sock) { |
| setSendBufferSize(sock, this.owner.getConduit().tcpBufferSize); |
| } |
| private void setReceiveBufferSize(Socket sock) { |
| setReceiveBufferSize(sock, this.owner.getConduit().tcpBufferSize); |
| } |
| private void setSendBufferSize(Socket sock, int requestedSize) { |
| setSocketBufferSize(sock, true, requestedSize); |
| } |
| private void setReceiveBufferSize(Socket sock, int requestedSize) { |
| setSocketBufferSize(sock, false, requestedSize); |
| } |
| public int getReceiveBufferSize() { |
| return recvBufferSize; |
| } |
| private void setSocketBufferSize(Socket sock, boolean send, int requestedSize) { |
| setSocketBufferSize(sock, send, requestedSize, false); |
| } |
| private void setSocketBufferSize(Socket sock, boolean send, int requestedSize, boolean alreadySetInSocket) { |
| if (requestedSize > 0) { |
| try { |
| int currentSize = send |
| ? sock.getSendBufferSize() |
| : sock.getReceiveBufferSize(); |
| if (currentSize == requestedSize) { |
| if (send) { |
| this.sendBufferSize = currentSize; |
| } |
| return; |
| } |
| if (!alreadySetInSocket) { |
| if (send) { |
| sock.setSendBufferSize(requestedSize); |
| } else { |
| sock.setReceiveBufferSize(requestedSize); |
| } |
| } else { |
| } |
| } catch (SocketException ignore) { |
| } |
| try { |
| int actualSize = send |
| ? sock.getSendBufferSize() |
| : sock.getReceiveBufferSize(); |
| if (send) { |
| this.sendBufferSize = actualSize; |
| } else { |
| this.recvBufferSize = actualSize; |
| } |
| if (actualSize < requestedSize) { |
| logger.info(LocalizedMessage.create(LocalizedStrings.Connection_SOCKET_0_IS_1_INSTEAD_OF_THE_REQUESTED_2, |
| new Object[] {(send ? "send buffer size" : "receive buffer size"), Integer.valueOf(actualSize), Integer.valueOf(requestedSize)})); |
| } else if (actualSize > requestedSize) { |
| if (logger.isTraceEnabled()) { |
| logger.trace("Socket {} buffer size is {} instead of the requested {}", |
| (send ? "send" : "receive"), actualSize, requestedSize); |
| } |
| // Remember the request size which is smaller. |
| // This remembered value is used for allocating direct mem buffers. |
| if (send) { |
| this.sendBufferSize = requestedSize; |
| } else { |
| this.recvBufferSize = requestedSize; |
| } |
| } |
| } catch (SocketException ignore) { |
| if (send) { |
| this.sendBufferSize = requestedSize; |
| } else { |
| this.recvBufferSize = requestedSize; |
| } |
| } |
| } |
| } |
| /** |
| * Returns the size of the send buffer on this connection's socket. |
| */ |
| public int getSendBufferSize() { |
| int result = this.sendBufferSize; |
| if (result != -1) { |
| return result; |
| } |
| try { |
| result = getSocket().getSendBufferSize(); |
| } catch (SocketException ignore) { |
| // just return a default |
| result = this.owner.getConduit().tcpBufferSize; |
| } |
| this.sendBufferSize = result; |
| return result; |
| } |
| |
| /** creates a connection that we accepted (it was initiated by |
| * an explicit connect being done on the other side). |
| * We will only receive data on this socket; never send. |
| */ |
| protected static Connection createReceiver(ConnectionTable t, Socket s) |
| throws IOException, ConnectionException |
| { |
| Connection c = new Connection(t, s); |
| boolean readerStarted = false; |
| try { |
| c.startReader(); |
| readerStarted = true; |
| } finally { |
| if (!readerStarted) { |
| c.closeForReconnect(LocalizedStrings.Connection_COULD_NOT_START_READER_THREAD.toLocalizedString()); |
| } |
| } |
| c.waitForHandshake(); |
| //sendHandshakeReplyOK(); |
| c.finishedConnecting = true; |
| return c; |
| } |
| |
| /** creates a connection that we accepted (it was initiated by |
| * an explicit connect being done on the other side). |
| */ |
| private Connection(ConnectionTable t, Socket s) |
| throws IOException, ConnectionException |
| { |
| if (t == null) { |
| throw new IllegalArgumentException(LocalizedStrings.Connection_NULL_CONNECTIONTABLE.toLocalizedString()); |
| } |
| this.isReceiver = true; |
| this.owner = t; |
| this.socket = s; |
| this.conduitIdStr = owner.getConduit().getId().toString(); |
| this.handshakeRead = false; |
| this.handshakeCancelled = false; |
| this.connected = true; |
| |
| try { |
| s.setTcpNoDelay(true); |
| s.setKeepAlive(true); |
| // s.setSoLinger(true, (Integer.valueOf(System.getProperty("p2p.lingerTime", "5000"))).intValue()); |
| setSendBufferSize(s, SMALL_BUFFER_SIZE); |
| setReceiveBufferSize(s); |
| } |
| catch (SocketException e) { |
| // unable to get the settings we want. Don't log an error because it will |
| // likely happen a lot |
| } |
| if (!useNIO()) { |
| try { |
| //this.output = new BufferedOutputStream(s.getOutputStream(), SMALL_BUFFER_SIZE); |
| this.output = s.getOutputStream(); |
| } |
| catch (IOException io) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNABLE_TO_GET_P2P_CONNECTION_STREAMS), io); |
| SocketCreator.asyncClose(s, this.remoteAddr.toString(), null); |
| throw io; |
| } |
| } |
| } |
| |
| void setIdleTimeoutTask(SystemTimerTask task) { |
| this.idleTask = task; |
| } |
| |
| |
| /** |
| * Returns true if an idle connection was detected. |
| */ |
| public boolean checkForIdleTimeout() { |
| if (isSocketClosed()) { |
| return true; |
| } |
| if (isSocketInUse()) { |
| return false; |
| } |
| boolean isIdle = !this.accessed; |
| this.accessed = false; |
| if (isIdle) { |
| this.timedOut = true; |
| this.owner.getConduit().stats.incLostLease(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Closing idle connection {} shared={} ordered={}", this, this.sharedResource, this.preserveOrder); |
| } |
| try { |
| // Instead of calling requestClose |
| // we call closeForReconnect. |
| // We don't want this timeout close to close |
| // any other connections. The problem with |
| // requestClose has removeEndpoint set to true |
| // which will close an receivers we have if this |
| // connection is a shared one. |
| closeForReconnect(LocalizedStrings.Connection_IDLE_CONNECTION_TIMED_OUT.toLocalizedString()); |
| } catch (Exception ignore) {} |
| } |
| return isIdle; |
| } |
| |
| static private byte[] okHandshakeBytes; |
| static private ByteBuffer okHandshakeBuf; |
| static { |
| int msglen = 1; // one byte for reply code |
| byte[] bytes = new byte[MSG_HEADER_BYTES + msglen]; |
| msglen = calcHdrSize(msglen); |
| bytes[MSG_HEADER_SIZE_OFFSET] = (byte)((msglen/0x1000000) & 0xff); |
| bytes[MSG_HEADER_SIZE_OFFSET+1] = (byte)((msglen/0x10000) & 0xff); |
| bytes[MSG_HEADER_SIZE_OFFSET+2] = (byte)((msglen/0x100) & 0xff); |
| bytes[MSG_HEADER_SIZE_OFFSET+3] = (byte)(msglen & 0xff); |
| bytes[MSG_HEADER_TYPE_OFFSET] = (byte)NORMAL_MSG_TYPE; // message type |
| bytes[MSG_HEADER_ID_OFFSET] = (byte)((MsgIdGenerator.NO_MSG_ID/0x100) & 0xff); |
| bytes[MSG_HEADER_ID_OFFSET+1] = (byte)(MsgIdGenerator.NO_MSG_ID & 0xff); |
| bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK; |
| int allocSize = bytes.length; |
| ByteBuffer bb; |
| if (TCPConduit.useDirectBuffers) { |
| bb = ByteBuffer.allocateDirect(allocSize); |
| } else { |
| bb = ByteBuffer.allocate(allocSize); |
| } |
| bb.put(bytes); |
| okHandshakeBuf = bb; |
| okHandshakeBytes = bytes; |
| } |
| |
| /** |
| * maximum message buffer size |
| */ |
| public static final int MAX_MSG_SIZE = 0x00ffffff; |
| public static int calcHdrSize(int byteSize) { |
| if (byteSize > MAX_MSG_SIZE) { |
| throw new IllegalStateException(LocalizedStrings.Connection_TCP_MESSAGE_EXCEEDED_MAX_SIZE_OF_0.toLocalizedString(Integer.valueOf(MAX_MSG_SIZE))); |
| } |
| int hdrSize = byteSize; |
| hdrSize |= (HANDSHAKE_VERSION << 24); |
| return hdrSize; |
| } |
| public static int calcMsgByteSize(int hdrSize) { |
| return hdrSize & MAX_MSG_SIZE; |
| } |
| public static byte calcHdrVersion(int hdrSize) throws IOException { |
| byte ver = (byte)(hdrSize >> 24); |
| if (ver != HANDSHAKE_VERSION) { |
| throw new IOException(LocalizedStrings.Connection_DETECTED_WRONG_VERSION_OF_GEMFIRE_PRODUCT_DURING_HANDSHAKE_EXPECTED_0_BUT_FOUND_1.toLocalizedString(new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(ver)})); |
| } |
| return ver; |
| } |
| private void sendOKHandshakeReply() throws IOException, ConnectionException { |
| byte[] my_okHandshakeBytes = null; |
| ByteBuffer my_okHandshakeBuf = null; |
| if (this.isReceiver) { |
| DistributionConfig cfg = owner.getConduit().config; |
| ByteBuffer bb; |
| if (useNIO() && TCPConduit.useDirectBuffers) { |
| bb = ByteBuffer.allocateDirect(128); |
| } else { |
| bb = ByteBuffer.allocate(128); |
| } |
| bb.putInt(0); // reserve first 4 bytes for packet length |
| bb.put((byte)NORMAL_MSG_TYPE); |
| bb.putShort(MsgIdGenerator.NO_MSG_ID); |
| bb.put(REPLY_CODE_OK_WITH_ASYNC_INFO); |
| bb.putInt(cfg.getAsyncDistributionTimeout()); |
| bb.putInt(cfg.getAsyncQueueTimeout()); |
| bb.putInt(cfg.getAsyncMaxQueueSize()); |
| // write own product version |
| Version.writeOrdinal(bb, Version.CURRENT.ordinal(), true); |
| // now set the msg length into position 0 |
| bb.putInt(0, calcHdrSize(bb.position()-MSG_HEADER_BYTES)); |
| if (useNIO()) { |
| my_okHandshakeBuf = bb; |
| bb.flip(); |
| } else { |
| my_okHandshakeBytes = new byte[bb.position()]; |
| bb.flip(); |
| bb.get(my_okHandshakeBytes); |
| } |
| } else { |
| my_okHandshakeBuf = okHandshakeBuf; |
| my_okHandshakeBytes = okHandshakeBytes; |
| } |
| if (useNIO()) { |
| synchronized (my_okHandshakeBuf) { |
| my_okHandshakeBuf.position(0); |
| nioWriteFully(getSocket().getChannel(), my_okHandshakeBuf, false, null); |
| } |
| } else { |
| synchronized(outLock) { |
| try { |
| // this.writerThread = Thread.currentThread(); |
| this.output.write(my_okHandshakeBytes, 0, my_okHandshakeBytes.length); |
| this.output.flush(); |
| } |
| finally { |
| // this.writerThread = null; |
| } |
| } |
| } |
| } |
| |
| private static final int HANDSHAKE_TIMEOUT_MS = Integer.getInteger("p2p.handshakeTimeoutMs", 59000).intValue(); |
| //private static final byte HANDSHAKE_VERSION = 1; // 501 |
| //public static final byte HANDSHAKE_VERSION = 2; // cbb5x_PerfScale |
| //public static final byte HANDSHAKE_VERSION = 3; // durable_client |
| //public static final byte HANDSHAKE_VERSION = 4; // dataSerialMay19 |
| // public static final byte HANDSHAKE_VERSION = 5; // split-brain bits |
| //public static final byte HANDSHAKE_VERSION = 6; // direct ack changes |
| // NOTICE: handshake_version should not be changed anymore. Use the gemfire |
| // version transmitted with the handshake bits and handle old handshakes |
| // based on that |
| public static final byte HANDSHAKE_VERSION = 7; // product version exchange during handshake |
| |
| /** |
| * @throws ConnectionException if the conduit has stopped |
| */ |
| private void waitForHandshake() throws ConnectionException { |
| boolean needToClose = false; |
| String reason = null; |
| try { |
| synchronized (this.handshakeSync) { |
| if (!this.handshakeRead && !this.handshakeCancelled) { |
| boolean success = false; |
| reason = LocalizedStrings.Connection_UNKNOWN.toLocalizedString(); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| final long endTime = System.currentTimeMillis() + HANDSHAKE_TIMEOUT_MS; |
| long msToWait = HANDSHAKE_TIMEOUT_MS; |
| while (!this.handshakeRead && !this.handshakeCancelled && msToWait > 0) { |
| this.handshakeSync.wait(msToWait); // spurious wakeup ok |
| if (!this.handshakeRead && !this.handshakeCancelled) { |
| msToWait = endTime - System.currentTimeMillis(); |
| } |
| } |
| if (!this.handshakeRead && !this.handshakeCancelled) { |
| reason = LocalizedStrings.Connection_HANDSHAKE_TIMED_OUT.toLocalizedString(); |
| String peerName; |
| if (this.remoteAddr != null) { |
| peerName = this.remoteAddr.toString(); |
| } |
| else { |
| peerName = "socket " + this.socket.getRemoteSocketAddress().toString() |
| + ":" + this.socket.getPort(); |
| } |
| throw new ConnectionException(LocalizedStrings.Connection_CONNECTION_HANDSHAKE_WITH_0_TIMED_OUT_AFTER_WAITING_1_MILLISECONDS.toLocalizedString( |
| new Object[] {peerName, Integer.valueOf(HANDSHAKE_TIMEOUT_MS)})); |
| } else { |
| success = this.handshakeRead; |
| } |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex); |
| reason = LocalizedStrings.Connection_INTERRUPTED.toLocalizedString(); |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| if (success) { |
| if (this.isReceiver) { |
| needToClose = !owner.getConduit().getMembershipManager().addSurpriseMember(this.remoteAddr, this.remoteId); |
| if (needToClose) { |
| reason = "this member is shunned"; |
| } |
| } |
| } |
| else { |
| needToClose = true; // for bug 42159 |
| } |
| } |
| } // !handshakeRead |
| } // synchronized |
| |
| } finally { |
| if (needToClose) { |
| // moved this call outside of the sync for bug 42159 |
| try { |
| requestClose(reason); // fix for bug 31546 |
| } |
| catch (Exception ignore) { |
| } |
| } |
| } |
| } |
| |
| private void notifyHandshakeWaiter(boolean success) { |
| synchronized (this.handshakeSync) { |
| if (success) { |
| this.handshakeRead = true; |
| } else { |
| this.handshakeCancelled = true; |
| } |
| this.handshakeSync.notify(); |
| } |
| } |
| |
| /** |
| * asynchronously close this connection |
| * |
| * @param beingSick |
| */ |
| private void asyncClose(boolean beingSick) { |
| // note: remoteId may be null if this is a receiver that hasn't finished its handshake |
| |
| // we do the close in a background thread because the operation may hang if |
| // there is a problem with the network. See bug #46659 |
| Runnable r = new Runnable() { |
| public void run() { |
| boolean rShuttingDown = readerShuttingDown; |
| synchronized(stateLock) { |
| if (readerThread != null && readerThread.isAlive() && |
| !rShuttingDown && connectionState == STATE_READING |
| || connectionState == STATE_READING_ACK) { |
| readerThread.interrupt(); |
| } |
| } |
| } |
| }; |
| // if simulating sickness, sockets must be closed in-line so that tests know |
| // that the vm is sick when the beSick operation completes |
| if (beingSick) { |
| r.run(); |
| } |
| else { |
| SocketCreator.asyncClose(this.socket, String.valueOf(this.remoteAddr), r); |
| } |
| } |
| |
| |
| private static final int CONNECT_HANDSHAKE_SIZE = 4096; |
| |
| private void handshakeNio() throws IOException { |
| // We jump through some extra hoops to use a MsgOutputStream |
| // This keeps us from allocating an extra DirectByteBuffer. |
| InternalDistributedMember myAddr = this.owner.getConduit().getLocalAddress(); |
| synchronized (myAddr) { |
| while (myAddr.getIpAddress() == null) { |
| try { |
| myAddr.wait(); // spurious wakeup ok |
| } |
| catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ie); |
| } |
| } |
| } |
| |
| Assert.assertTrue(myAddr.getDirectChannelPort() == this.owner.getConduit().getPort()); |
| |
| final MsgOutputStream connectHandshake |
| = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE); |
| //connectHandshake.reset(); |
| /** |
| * Note a byte of zero is always written because old products |
| * serialized a member id with always sends an ip address. |
| * My reading of the ip-address specs indicated that the first byte |
| * of a valid address would never be 0. |
| */ |
| connectHandshake.writeByte(0); |
| connectHandshake.writeByte(HANDSHAKE_VERSION); |
| // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION |
| InternalDataSerializer.invokeToData(myAddr, connectHandshake); |
| connectHandshake.writeBoolean(this.sharedResource); |
| connectHandshake.writeBoolean(this.preserveOrder); |
| connectHandshake.writeLong(this.uniqueId); |
| // write the product version ordinal |
| Version.CURRENT.writeOrdinal(connectHandshake, true); |
| connectHandshake.writeInt(dominoCount.get()+1); |
| // this writes the sending member + thread name that is stored in senderName |
| // on the receiver to show the cause of reader thread creation |
| // if (dominoCount.get() > 0) { |
| // connectHandshake.writeUTF(Thread.currentThread().getName()); |
| // } else { |
| // String name = owner.getDM().getConfig().getName(); |
| // if (name == null) { |
| // name = "pid="+OSProcess.getId(); |
| // } |
| // connectHandshake.writeUTF("["+name+"] "+Thread.currentThread().getName()); |
| // } |
| connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, DistributionManager.STANDARD_EXECUTOR, MsgIdGenerator.NO_MSG_ID); |
| nioWriteFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null); |
| } |
| |
| private void handshakeStream() throws IOException { |
| //this.output = new BufferedOutputStream(getSocket().getOutputStream(), owner.getConduit().bufferSize); |
| this.output = getSocket().getOutputStream(); |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(CONNECT_HANDSHAKE_SIZE); |
| DataOutputStream os = new DataOutputStream(baos); |
| InternalDistributedMember myAddr = owner.getConduit().getLocalAddress(); |
| os.writeByte(0); |
| os.writeByte(HANDSHAKE_VERSION); |
| // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION |
| InternalDataSerializer.invokeToData(myAddr, os); |
| os.writeBoolean(this.sharedResource); |
| os.writeBoolean(this.preserveOrder); |
| os.writeLong(this.uniqueId); |
| Version.CURRENT.writeOrdinal(os, true); |
| os.writeInt(dominoCount.get()+1); |
| // this writes the sending member + thread name that is stored in senderName |
| // on the receiver to show the cause of reader thread creation |
| // if (dominoCount.get() > 0) { |
| // os.writeUTF(Thread.currentThread().getName()); |
| // } else { |
| // String name = owner.getDM().getConfig().getName(); |
| // if (name == null) { |
| // name = "pid="+OSProcess.getId(); |
| // } |
| // os.writeUTF("["+name+"] "+Thread.currentThread().getName()); |
| // } |
| os.flush(); |
| |
| byte[] msg = baos.toByteArray(); |
| int len = calcHdrSize(msg.length); |
| byte[] lenbytes = new byte[MSG_HEADER_BYTES]; |
| lenbytes[MSG_HEADER_SIZE_OFFSET] = (byte)((len/0x1000000) & 0xff); |
| lenbytes[MSG_HEADER_SIZE_OFFSET+1] = (byte)((len/0x10000) & 0xff); |
| lenbytes[MSG_HEADER_SIZE_OFFSET+2] = (byte)((len/0x100) & 0xff); |
| lenbytes[MSG_HEADER_SIZE_OFFSET+3] = (byte)(len & 0xff); |
| lenbytes[MSG_HEADER_TYPE_OFFSET] = (byte)NORMAL_MSG_TYPE; |
| lenbytes[MSG_HEADER_ID_OFFSET] = (byte)((MsgIdGenerator.NO_MSG_ID/0x100) & 0xff); |
| lenbytes[MSG_HEADER_ID_OFFSET+1] = (byte)(MsgIdGenerator.NO_MSG_ID & 0xff); |
| synchronized(outLock) { |
| try { |
| // this.writerThread = Thread.currentThread(); |
| this.output.write(lenbytes, 0, lenbytes.length); |
| this.output.write(msg, 0, msg.length); |
| this.output.flush(); |
| } |
| finally { |
| // this.writerThread = null; |
| } |
| } |
| } |
| |
| /** |
| * |
| * @throws IOException if handshake fails |
| */ |
| private void attemptHandshake() throws IOException { |
| // send HANDSHAKE |
| // send this server's port. It's expected on the other side |
| if (useNIO()) { |
| handshakeNio(); |
| } |
| else { |
| handshakeStream(); |
| } |
| |
| startReader(); // this reader only reads the handshake and then exits |
| waitForHandshake(); // waiting for reply |
| } |
| |
| /** time between connection attempts*/ |
| private static final int RECONNECT_WAIT_TIME |
| = Integer.getInteger("gemfire.RECONNECT_WAIT_TIME", 2000).intValue(); |
| |
| /** creates a new connection to a remote server. |
| * We are initiating this connection; the other side must accept us |
| * We will almost always send messages; small acks are received. |
| */ |
| protected static Connection createSender(final MembershipManager mgr, |
| final ConnectionTable t, |
| final boolean preserveOrder, |
| final Stub key, |
| final InternalDistributedMember remoteAddr, |
| final boolean sharedResource, |
| final long startTime, |
| final long ackTimeout, |
| final long ackSATimeout) |
| throws IOException, DistributedSystemDisconnectedException |
| { |
| boolean warningPrinted = false; |
| boolean success = false; |
| boolean firstTime = true; |
| Connection conn = null; |
| // keep trying. Note that this may be executing during the shutdown window |
| // where a cancel criterion has not been established, but threads are being |
| // interrupted. In this case we must allow the connection to succeed even |
| // though subsequent messaging using the socket may fail |
| boolean interrupted = Thread.interrupted(); |
| boolean severeAlertIssued = false; |
| boolean suspected = false; |
| long reconnectWaitTime = RECONNECT_WAIT_TIME; |
| boolean connectionErrorLogged = false; |
| try { |
| while (!success) { // keep trying |
| // Quit if DM has stopped distribution |
| t.getConduit().getCancelCriterion().checkCancelInProgress(null); |
| long now = System.currentTimeMillis(); |
| if (!severeAlertIssued && ackSATimeout > 0 && startTime + ackTimeout < now) { |
| if (startTime + ackTimeout + ackSATimeout < now) { |
| if (remoteAddr != null) { |
| logger.fatal(LocalizedMessage.create( |
| LocalizedStrings.Connection_UNABLE_TO_FORM_A_TCPIP_CONNECTION_TO_0_IN_OVER_1_SECONDS, new Object[] { remoteAddr, |
| (ackSATimeout + ackTimeout) / 1000 })); |
| } |
| severeAlertIssued = true; |
| } |
| else if (!suspected) { |
| if (remoteAddr != null) { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.Connection_UNABLE_TO_FORM_A_TCPIP_CONNECTION_TO_0_IN_OVER_1_SECONDS, |
| new Object[] { remoteAddr, (ackTimeout)/1000 })); |
| } |
| mgr.suspectMember(remoteAddr, LocalizedStrings.Connection_UNABLE_TO_FORM_A_TCPIP_CONNECTION_IN_A_REASONABLE_AMOUNT_OF_TIME.toLocalizedString()); |
| suspected = true; |
| } |
| reconnectWaitTime = Math.min(RECONNECT_WAIT_TIME, |
| ackSATimeout - (now - startTime - ackTimeout)); |
| if (reconnectWaitTime <= 0) { |
| reconnectWaitTime = RECONNECT_WAIT_TIME; |
| } |
| } else if (!suspected && (startTime > 0) && (ackTimeout > 0) |
| && (startTime + ackTimeout < now)) { |
| mgr.suspectMember(remoteAddr, LocalizedStrings.Connection_UNABLE_TO_FORM_A_TCPIP_CONNECTION_IN_A_REASONABLE_AMOUNT_OF_TIME.toLocalizedString()); |
| suspected = true; |
| } |
| if (firstTime) { |
| firstTime = false; |
| InternalDistributedMember m = mgr.getMemberForStub(key, true); |
| if (m == null) { |
| throw new IOException("Member for stub " + key + " left the group"); |
| } |
| } |
| else { |
| // if we're sending an alert and can't connect, bail out. A sick |
| // alert listener should not prevent cache operations from continuing |
| if (AlertAppender.isThreadAlerting()) { |
| // do not change the text of this exception - it is looked for in exception handlers |
| throw new IOException("Cannot form connection to alert listener " + key); |
| } |
| |
| // Wait briefly... |
| interrupted = Thread.interrupted() || interrupted; |
| try { |
| Thread.sleep(reconnectWaitTime); |
| } |
| catch (InterruptedException ie) { |
| interrupted = true; |
| t.getConduit().getCancelCriterion().checkCancelInProgress(ie); |
| } |
| t.getConduit().getCancelCriterion().checkCancelInProgress(null); |
| InternalDistributedMember m = mgr.getMemberForStub(key, true); |
| if (m == null) { |
| throw new IOException(LocalizedStrings.Connection_MEMBER_FOR_STUB_0_LEFT_THE_GROUP.toLocalizedString(key)); |
| } |
| if (!warningPrinted) { |
| warningPrinted = true; |
| logger.warn(LocalizedMessage.create(LocalizedStrings.Connection_CONNECTION_ATTEMPTING_RECONNECT_TO_PEER__0, m)); |
| } |
| t.getConduit().stats.incReconnectAttempts(); |
| } |
| //create connection |
| try { |
| conn = null; |
| conn = new Connection(mgr, t, preserveOrder, key, remoteAddr, sharedResource); |
| } |
| catch (javax.net.ssl.SSLHandshakeException se) { |
| // no need to retry if certificates were rejected |
| throw se; |
| } |
| catch (IOException ioe) { |
| // Only give up if the member leaves the view. |
| InternalDistributedMember m = mgr.getMemberForStub(key, true); |
| if (m == null) { |
| throw ioe; |
| } |
| t.getConduit().getCancelCriterion().checkCancelInProgress(null); |
| if ("Too many open files".equals(ioe.getMessage())) { |
| t.fileDescriptorsExhausted(); |
| } |
| else if (!connectionErrorLogged) { |
| connectionErrorLogged = true; // otherwise change to use 100ms intervals causes a lot of these |
| logger.info(LocalizedMessage.create( |
| LocalizedStrings.Connection_CONNECTION_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1, |
| new Object[] {sharedResource, preserveOrder, m, ioe})); |
| } |
| } // IOException |
| finally { |
| if (conn == null) { |
| t.getConduit().stats.incFailedConnect(); |
| } |
| } |
| if (conn != null) { |
| // handshake |
| try { |
| conn.attemptHandshake(); |
| if (conn.isSocketClosed()) { |
| // something went wrong while reading the handshake |
| // and the socket was closed or this guy sent us a |
| // ShutdownMessage |
| InternalDistributedMember m = mgr.getMemberForStub(key, true); |
| if (m == null) { |
| throw new IOException(LocalizedStrings.Connection_MEMBER_FOR_STUB_0_LEFT_THE_GROUP.toLocalizedString(key)); |
| } |
| t.getConduit().getCancelCriterion().checkCancelInProgress(null); |
| // no success but no need to log; just retry |
| } |
| else { |
| success = true; |
| } |
| } |
| catch (DistributedSystemDisconnectedException e) { |
| throw e; |
| } |
| catch (ConnectionException e) { |
| InternalDistributedMember m = mgr.getMemberForStub(key, true); |
| if (m == null) { |
| IOException ioe = new IOException(LocalizedStrings.Connection_HANDSHAKE_FAILED.toLocalizedString()); |
| ioe.initCause(e); |
| throw ioe; |
| } |
| t.getConduit().getCancelCriterion().checkCancelInProgress(null); |
| logger.info(LocalizedMessage.create( |
| LocalizedStrings.Connection_CONNECTION_HANDSHAKE_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1, |
| new Object[] {sharedResource, preserveOrder, m,e})); |
| } |
| catch (IOException e) { |
| InternalDistributedMember m = mgr.getMemberForStub(key, true); |
| if (m == null) { |
| throw e; |
| } |
| t.getConduit().getCancelCriterion().checkCancelInProgress(null); |
| logger.info(LocalizedMessage.create( |
| LocalizedStrings.Connection_CONNECTION_HANDSHAKE_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1, |
| new Object[] {sharedResource, preserveOrder, m,e})); |
| if (!sharedResource && "Too many open files".equals(e.getMessage())) { |
| t.fileDescriptorsExhausted(); |
| } |
| } |
| finally { |
| if (!success) { |
| try { |
| conn.requestClose(LocalizedStrings.Connection_FAILED_HANDSHAKE.toLocalizedString()); |
| } catch (Exception ignore) {} |
| conn = null; |
| } |
| } |
| } |
| } // while |
| if (warningPrinted) { |
| logger.info(LocalizedMessage.create( |
| LocalizedStrings.Connection_0_SUCCESSFULLY_REESTABLISHED_CONNECTION_TO_PEER_1, |
| new Object[] {mgr.getLocalMember(), remoteAddr})); |
| } |
| } |
| finally { |
| try { |
| if (!success) { |
| if (conn != null) { |
| conn.requestClose(LocalizedStrings.Connection_FAILED_CONSTRUCTION.toLocalizedString()); |
| conn = null; |
| } |
| } |
| } |
| finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| //Assert.assertTrue(conn != null); |
| if (conn == null) { |
| throw new ConnectionException( |
| LocalizedStrings.Connection_CONNECTION_FAILED_CONSTRUCTION_FOR_PEER_0 |
| .toLocalizedString(mgr.getMemberForStub(key, true))); |
| } |
| if (preserveOrder && BATCH_SENDS) { |
| conn.createBatchSendBuffer(); |
| } |
| conn.finishedConnecting = true; |
| return conn; |
| } |
| |
| private void setRemoteAddr(InternalDistributedMember m, Stub stub) { |
| this.remoteAddr = this.owner.getDM().getCanonicalId(m); |
| this.remoteId = stub; |
| MembershipManager mgr = this.owner.owner.getMembershipManager(); |
| mgr.addSurpriseMember(m, stub); |
| } |
| |
| /** creates a new connection to a remote server. |
| * We are initiating this connection; the other side must accept us |
| * We will almost always send messages; small acks are received. |
| */ |
| private Connection(MembershipManager mgr, |
| ConnectionTable t, |
| boolean preserveOrder, |
| Stub key, |
| InternalDistributedMember remoteAddr, |
| boolean sharedResource) |
| throws IOException, DistributedSystemDisconnectedException |
| { |
| if (t == null) { |
| throw new IllegalArgumentException(LocalizedStrings.Connection_CONNECTIONTABLE_IS_NULL.toLocalizedString()); |
| } |
| this.isReceiver = false; |
| this.owner = t; |
| this.sharedResource = sharedResource; |
| this.preserveOrder = preserveOrder; |
| setRemoteAddr(remoteAddr, key); |
| this.conduitIdStr = this.owner.getConduit().getId().toString(); |
| this.handshakeRead = false; |
| this.handshakeCancelled = false; |
| this.connected = true; |
| |
| this.uniqueId = idCounter.getAndIncrement(); |
| |
| // connect to listening socket |
| |
| InetSocketAddress addr = new InetSocketAddress(remoteId.getInetAddress(), remoteId.getPort()); |
| if (useNIO()) { |
| SocketChannel channel = SocketChannel.open(); |
| this.owner.addConnectingSocket(channel.socket(), addr.getAddress()); |
| try { |
| channel.socket().setTcpNoDelay(true); |
| |
| channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE); |
| |
| /** If conserve-sockets is false, the socket can be used for receiving responses, |
| * so set the receive buffer accordingly. |
| */ |
| if(!sharedResource) { |
| setReceiveBufferSize(channel.socket(), this.owner.getConduit().tcpBufferSize); |
| } |
| else { |
| setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only receive ack messages |
| } |
| setSendBufferSize(channel.socket()); |
| channel.configureBlocking(true); |
| |
| int connectTime = getP2PConnectTimeout();; |
| |
| try { |
| SocketUtils.connect(channel.socket(), addr, connectTime); |
| } catch (NullPointerException e) { |
| // bug #45044 - jdk 1.7 sometimes throws an NPE here |
| ConnectException c = new ConnectException("Encountered bug #45044 - retrying"); |
| c.initCause(e); |
| // prevent a hot loop by sleeping a little bit |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| } |
| throw c; |
| } catch (CancelledKeyException e) { |
| // bug #44469: for some reason NIO throws this runtime exception |
| // instead of an IOException on timeouts |
| ConnectException c = new ConnectException(LocalizedStrings.Connection_ATTEMPT_TO_CONNECT_TIMED_OUT_AFTER_0_MILLISECONDS |
| .toLocalizedString(new Object[]{connectTime})); |
| c.initCause(e); |
| throw c; |
| } catch (ClosedSelectorException e) { |
| // bug #44808: for some reason JRockit NIO thorws this runtime exception |
| // instead of an IOException on timeouts |
| ConnectException c = new ConnectException(LocalizedStrings.Connection_ATTEMPT_TO_CONNECT_TIMED_OUT_AFTER_0_MILLISECONDS |
| .toLocalizedString(new Object[]{connectTime})); |
| c.initCause(e); |
| throw c; |
| } |
| } |
| finally { |
| this.owner.removeConnectingSocket(channel.socket()); |
| } |
| this.socket = channel.socket(); |
| } |
| else { |
| if (TCPConduit.useSSL) { |
| // socket = javax.net.ssl.SSLSocketFactory.getDefault() |
| // .createSocket(remoteId.getInetAddress(), remoteId.getPort()); |
| int socketBufferSize = sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize; |
| this.socket = SocketCreator.getDefaultInstance().connectForServer( remoteId.getInetAddress(), remoteId.getPort(), socketBufferSize ); |
| // Set the receive buffer size local fields. It has already been set in the socket. |
| setSocketBufferSize(this.socket, false, socketBufferSize, true); |
| setSendBufferSize(this.socket); |
| } |
| else { |
| //socket = new Socket(remoteId.getInetAddress(), remoteId.getPort()); |
| Socket s = new Socket(); |
| this.socket = s; |
| s.setTcpNoDelay(true); |
| s.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE); |
| setReceiveBufferSize(s, SMALL_BUFFER_SIZE); |
| setSendBufferSize(s); |
| SocketUtils.connect(s, addr, 0); |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("Connection: connected to {} with stub {}", remoteAddr, addr); |
| } |
| try { getSocket().setTcpNoDelay(true); } catch (SocketException e) { } |
| } |
| |
| /** |
| * Batch sends currently should not be turned on because: |
| * 1. They will be used for all sends (instead of just no-ack) |
| * and thus will break messages that wait for a response (or kill perf). |
| * 2. The buffer is not properly flushed and closed on shutdown. |
| * The code attempts to do this but must not be doing it correctly. |
| */ |
| private static final boolean BATCH_SENDS = Boolean.getBoolean("p2p.batchSends"); |
| protected static final int BATCH_BUFFER_SIZE = Integer.getInteger("p2p.batchBufferSize", 1024*1024).intValue(); |
| protected static final int BATCH_FLUSH_MS = Integer.getInteger("p2p.batchFlushTime", 50).intValue(); |
| protected Object batchLock; |
| protected ByteBuffer fillBatchBuffer; |
| protected ByteBuffer sendBatchBuffer; |
| private BatchBufferFlusher batchFlusher; |
| |
| private void createBatchSendBuffer() { |
| // batch send buffer isn't needed if old-io is being used |
| if (!this.useNIO) { |
| return; |
| } |
| this.batchLock = new Object(); |
| if (TCPConduit.useDirectBuffers) { |
| this.fillBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE); |
| this.sendBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE); |
| } else { |
| this.fillBatchBuffer = ByteBuffer.allocate(BATCH_BUFFER_SIZE); |
| this.sendBatchBuffer = ByteBuffer.allocate(BATCH_BUFFER_SIZE); |
| } |
| this.batchFlusher = new BatchBufferFlusher(); |
| this.batchFlusher.start(); |
| } |
| |
| private class BatchBufferFlusher extends Thread { |
| private volatile boolean flushNeeded = false; |
| private volatile boolean timeToStop = false; |
| private DMStats stats; |
| |
| |
| public BatchBufferFlusher() { |
| setDaemon(true); |
| this.stats = owner.getConduit().stats; |
| } |
| /** |
| * Called when a message writer needs the current fillBatchBuffer flushed |
| */ |
| public void flushBuffer(ByteBuffer bb) { |
| final long start = DistributionStats.getStatTime(); |
| try { |
| synchronized (this) { |
| synchronized (batchLock) { |
| if (bb != fillBatchBuffer) { |
| // it must have already been flushed. So just return |
| // and use the new fillBatchBuffer |
| return; |
| } |
| } |
| this.flushNeeded = true; |
| this.notify(); |
| } |
| synchronized (batchLock) { |
| // Wait for the flusher thread |
| while (bb == fillBatchBuffer) { |
| Connection.this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| batchLock.wait(); // spurious wakeup ok |
| } |
| catch (InterruptedException ex) { |
| interrupted = true; |
| } |
| finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // while |
| } |
| } finally { |
| owner.getConduit().stats.incBatchWaitTime(start); |
| } |
| } |
| |
| public void close() { |
| synchronized (this) { |
| this.timeToStop = true; |
| this.flushNeeded = true; |
| this.notify(); |
| } |
| } |
| |
| @Override |
| public void run() { |
| try { |
| synchronized (this) { |
| while (!timeToStop) { |
| if (!this.flushNeeded && fillBatchBuffer.position() <= (BATCH_BUFFER_SIZE/2)) { |
| wait(BATCH_FLUSH_MS); // spurious wakeup ok |
| } |
| if (this.flushNeeded || fillBatchBuffer.position() > (BATCH_BUFFER_SIZE/2)) { |
| final long start = DistributionStats.getStatTime(); |
| synchronized (batchLock) { |
| // This is the only block of code that will swap |
| // the buffer references |
| this.flushNeeded = false; |
| ByteBuffer tmp = fillBatchBuffer; |
| fillBatchBuffer = sendBatchBuffer; |
| sendBatchBuffer = tmp; |
| batchLock.notifyAll(); |
| } |
| // We now own the sendBatchBuffer |
| if (sendBatchBuffer.position() > 0) { |
| final boolean origSocketInUse = socketInUse; |
| socketInUse = true; |
| try { |
| sendBatchBuffer.flip(); |
| SocketChannel channel = getSocket().getChannel(); |
| nioWriteFully(channel, sendBatchBuffer, false, null); |
| sendBatchBuffer.clear(); |
| } catch (IOException ex) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0,ex)); |
| readerShuttingDown = true; |
| requestClose(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0.toLocalizedString(ex)); |
| } catch (ConnectionException ex) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0,ex)); |
| readerShuttingDown = true; |
| requestClose(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0.toLocalizedString(ex)); |
| } finally { |
| accessed(); |
| socketInUse = origSocketInUse; |
| } |
| } |
| this.stats.incBatchFlushTime(start); |
| } |
| } |
| } |
| } catch (InterruptedException ex) { |
| // time for this thread to shutdown |
| // Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| private void closeBatchBuffer() { |
| if (this.batchFlusher != null) { |
| this.batchFlusher.close(); |
| } |
| } |
| |
| /** use to test message prep overhead (no socket write). |
| * WARNING: turning this on completely disables distribution of batched sends |
| */ |
| private static final boolean SOCKET_WRITE_DISABLED = Boolean.getBoolean("p2p.disableSocketWrite"); |
| |
| private void batchSend(ByteBuffer src) throws IOException { |
| if (SOCKET_WRITE_DISABLED) { |
| return; |
| } |
| final long start = DistributionStats.getStatTime(); |
| try { |
| ByteBuffer dst = null; |
| Assert.assertTrue(src.remaining() <= BATCH_BUFFER_SIZE , "Message size(" + src.remaining() + ") exceeded BATCH_BUFFER_SIZE(" + BATCH_BUFFER_SIZE + ")"); |
| do { |
| synchronized (this.batchLock) { |
| dst = this.fillBatchBuffer; |
| if (src.remaining() <= dst.remaining()) { |
| final long copyStart = DistributionStats.getStatTime(); |
| dst.put(src); |
| this.owner.getConduit().stats.incBatchCopyTime(copyStart); |
| return; |
| } |
| } |
| // If we got this far then we do not have room in the current |
| // buffer and need the flusher thread to flush before we can fill it |
| this.batchFlusher.flushBuffer(dst); |
| } while (true); |
| } finally { |
| this.owner.getConduit().stats.incBatchSendTime(start); |
| } |
| } |
| |
| /** |
| * Request that the manager close this connection, or close it |
| * forcibly if there is no manager. Invoking this method ensures |
| * that the proper synchronization is done. |
| */ |
| void requestClose(String reason) { |
| close(reason, true, true, false, false); |
| } |
| |
| boolean isClosing() { |
| return this.closing.get(); |
| } |
| |
| /** |
| * Used to close a connection that has not yet been registered |
| * with the distribution manager. |
| */ |
| void closePartialConnect(String reason) { |
| close(reason, false, false, false, false); |
| } |
| |
| void closePartialConnect(String reason, boolean beingSick) { |
| close(reason, false, false, beingSick, false); |
| } |
| |
| void closeForReconnect(String reason) { |
| close(reason, true, false, false, false); |
| } |
| |
| void closeOldConnection(String reason) { |
| close(reason, true, true, false, true); |
| } |
| |
| /** |
| * Closes the connection. |
| * |
| * @see #requestClose |
| */ |
| @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="TLW_TWO_LOCK_WAIT") |
| private void close(String reason, boolean cleanupEndpoint, |
| boolean p_removeEndpoint, boolean beingSick, boolean forceRemoval) { |
| boolean removeEndpoint = p_removeEndpoint; |
| // use getAndSet outside sync on this to fix 42330 |
| boolean onlyCleanup = this.closing.getAndSet(true); |
| if (onlyCleanup && !forceRemoval) { |
| return; |
| } |
| if (!onlyCleanup) { |
| synchronized (this) { |
| this.stopped = true; |
| if (this.connected) { |
| if (this.asyncQueuingInProgress |
| && this.pusherThread != Thread.currentThread()) { |
| // We don't need to do this if we are the pusher thread |
| // and we have determined that we need to close the connection. |
| // See bug 37601. |
| synchronized (this.outgoingQueue) { |
| // wait for the flusher to complete (it may timeout) |
| while (this.asyncQueuingInProgress) { |
| // Don't do this: causes closes to not get done in the event |
| // of an orderly shutdown: |
| // this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| this.outgoingQueue.wait(); // spurious wakeup ok |
| } catch (InterruptedException ie) { |
| interrupted = true; |
| // this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ie); |
| } |
| finally { |
| if (interrupted) Thread.currentThread().interrupt(); |
| } |
| } // while |
| } // synchronized |
| } |
| this.connected = false; |
| closeSenderSem(); |
| { |
| final DMStats stats = this.owner.getConduit().stats; |
| if (this.finishedConnecting) { |
| if (this.isReceiver) { |
| stats.decReceivers(); |
| } else { |
| stats.decSenders(this.sharedResource, this.preserveOrder); |
| } |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("Closing socket for {}", this); |
| } |
| } |
| else if (!forceRemoval) { |
| removeEndpoint = false; |
| } |
| // make sure our socket is closed |
| asyncClose(false); |
| nioLengthSet = false; |
| } // synchronized |
| |
| // moved the call to notifyHandshakeWaiter out of the above |
| // synchronized block to fix bug #42159 |
| // Make sure anyone waiting for a handshake stops waiting |
| notifyHandshakeWaiter(false); |
| // wait a bit for the our reader thread to exit |
| // don't wait if we are the reader thread |
| boolean isIBM = false; |
| // if network partition detection is enabled or this is an admin vm |
| // we can't wait for the reader thread when running in an IBM JRE. See |
| // bug 41889 |
| if (this.owner.owner.config.getEnableNetworkPartitionDetection() || |
| this.owner.owner.getLocalId().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE || |
| this.owner.owner.getLocalId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) { |
| isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor")); |
| } |
| if (!beingSick && this.readerThread != null && !isIBM && this.readerThread.isAlive() |
| && this.readerThread != Thread.currentThread()) { |
| try { |
| this.readerThread.join(500); |
| if (this.readerThread.isAlive() && !this.readerShuttingDown |
| && owner.getDM().getRootCause() == null) { // don't wait twice if there's a system failure |
| this.readerThread.join(1500); |
| if (this.readerThread.isAlive()) { |
| logger.info(LocalizedMessage.create(LocalizedStrings.Connection_TIMED_OUT_WAITING_FOR_READERTHREAD_ON_0_TO_FINISH, this)); |
| } |
| } |
| } |
| catch (IllegalThreadStateException ignore) { |
| // ignored - thread already stopped |
| } |
| catch (InterruptedException ignore) { |
| Thread.currentThread().interrupt(); |
| // but keep going, we're trying to close. |
| } |
| } // !onlyCleanup |
| |
| closeBatchBuffer(); |
| closeAllMsgDestreamers(); |
| } |
| if (cleanupEndpoint) { |
| if (this.isReceiver) { |
| this.owner.removeReceiver(this); |
| } |
| if (removeEndpoint) { |
| if (this.sharedResource) { |
| if (!this.preserveOrder) { |
| // only remove endpoint when shared unordered connection |
| // is closed. This is part of the fix for bug 32980. |
| if (!this.isReceiver) { |
| // Only remove endpoint if sender. |
| if (this.finishedConnecting) { |
| // only remove endpoint if our constructor finished |
| this.owner.removeEndpoint(this.remoteId, reason); |
| } |
| } |
| } |
| else { |
| this.owner.removeSharedConnection(reason, this.remoteId, this.preserveOrder, this); |
| } |
| } |
| else if (!this.isReceiver) { |
| this.owner.removeThreadConnection(this.remoteId, this); |
| } |
| } |
| else { |
| // This code is ok to do even if the ConnectionTable |
| // has never added this Connection to its maps since |
| // the calls in this block use our identity to do the removes. |
| if (this.sharedResource) { |
| this.owner.removeSharedConnection(reason, this.remoteId, this.preserveOrder, this); |
| } |
| else if (!this.isReceiver) { |
| this.owner.removeThreadConnection(this.remoteId, this); |
| } |
| } |
| } |
| |
| //This cancels the idle timer task, but it also removes the tasks |
| //reference to this connection, freeing up the connection (and it's buffers |
| //for GC sooner. |
| if(idleTask != null) { |
| idleTask.cancel(); |
| } |
| |
| if(ackTimeoutTask!=null){ |
| ackTimeoutTask.cancel(); |
| } |
| |
| } |
| |
| /** starts a reader thread */ |
| private void startReader() { |
| ThreadGroup group = |
| LoggingThreadGroup.createThreadGroup("P2P Reader Threads", logger); |
| Assert.assertTrue(this.readerThread == null); |
| this.readerThread = |
| new Thread(group, this, p2pReaderName()); |
| this.readerThread.setDaemon(true); |
| stopped = false; |
| this.readerThread.start(); |
| } |
| |
| |
| /** in order to read non-NIO socket-based messages we need to have a thread |
| actively trying to grab bytes out of the sockets input queue. |
| This is that thread. */ |
| public void run() { |
| ConnectionTable.threadWantsSharedResources(); |
| if (this.isReceiver) { |
| makeReaderThread(); |
| } |
| try { |
| if (useNIO()) { |
| runNioReader(); |
| } else { |
| runOioReader(); |
| } |
| } finally { |
| // bug36060: do the socket close within a finally block |
| if (logger.isDebugEnabled()) { |
| logger.debug("Stopping {} for {}", p2pReaderName(), remoteId); |
| } |
| if (this.isReceiver) { |
| if (!this.sharedResource) { |
| this.owner.owner.stats.incThreadOwnedReceivers(-1L, dominoCount.get()); |
| } |
| asyncClose(false); |
| this.owner.removeAndCloseThreadOwnedSockets(); |
| } |
| ByteBuffer tmp = this.nioInputBuffer; |
| if(tmp != null) { |
| this.nioInputBuffer = null; |
| final DMStats stats = this.owner.getConduit().stats; |
| Buffers.releaseReceiveBuffer(tmp, stats); |
| } |
| // make sure that if the reader thread exits we notify a thread waiting |
| // for the handshake. |
| // see bug 37524 for an example of listeners hung in waitForHandshake |
| notifyHandshakeWaiter(false); |
| } // finally |
| } |
| |
| private String p2pReaderName() { |
| StringBuffer sb = new StringBuffer(64); |
| if (this.isReceiver) { |
| sb.append("P2P message reader@"); |
| } else { |
| sb.append("P2P handshake reader@"); |
| } |
| sb.append(Integer.toHexString(System.identityHashCode(this))); |
| if (!this.isReceiver) { |
| sb.append('-') |
| .append(getUniqueId()); |
| } |
| return sb.toString(); |
| } |
| |
| private void runNioReader() { |
| // take a snapshot of uniqueId to detect reconnect attempts; see bug 37592 |
| SocketChannel channel = null; |
| try { |
| channel = getSocket().getChannel(); |
| channel.configureBlocking(true); |
| } catch (ClosedChannelException e) { |
| // bug 37693: the channel was asynchronously closed. Our work |
| // is done. |
| try { |
| requestClose(LocalizedStrings.Connection_RUNNIOREADER_CAUGHT_CLOSED_CHANNEL.toLocalizedString()); |
| } catch (Exception ignore) {} |
| return; // exit loop and thread |
| } catch (IOException ex) { |
| if (stopped || owner.getConduit().getCancelCriterion().cancelInProgress() != null) { |
| try { |
| requestClose(LocalizedStrings.Connection_RUNNIOREADER_CAUGHT_SHUTDOWN.toLocalizedString()); |
| } catch (Exception ignore) {} |
| return; // bug37520: exit loop (and thread) |
| } |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_FAILED_SETTING_CHANNEL_TO_BLOCKING_MODE_0, ex)); |
| this.readerShuttingDown = true; |
| try { requestClose(LocalizedStrings.Connection_FAILED_SETTING_CHANNEL_TO_BLOCKING_MODE_0.toLocalizedString(ex)); } catch (Exception ignore) {} |
| return; |
| } |
| |
| if (!stopped) { |
| // Assert.assertTrue(owner != null, "How did owner become null"); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Starting {}", p2pReaderName()); |
| } |
| } |
| // we should not change the state of the connection if we are a handshake reader thread |
| // as there is a race between this thread and the application thread doing direct ack |
| // fix for #40869 |
| boolean isHandShakeReader = false; |
| try { |
| for (;;) { |
| if (stopped) { |
| break; |
| } |
| if (SystemFailure.getFailure() != null) { |
| // Allocate no objects here! |
| Socket s = this.socket; |
| if (s != null) { |
| try { |
| s.close(); |
| } |
| catch (IOException e) { |
| // don't care |
| } |
| } |
| SystemFailure.checkFailure(); // throws |
| } |
| if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) { |
| break; |
| } |
| |
| try { |
| ByteBuffer buff = getNIOBuffer(); |
| synchronized(stateLock) { |
| connectionState = STATE_READING; |
| } |
| int amt = channel.read(buff); |
| synchronized(stateLock) { |
| connectionState = STATE_IDLE; |
| } |
| if (amt == 0) { |
| continue; |
| } |
| if (amt < 0) { |
| this.readerShuttingDown = true; |
| try { |
| requestClose("SocketChannel.read returned EOF"); |
| requestClose(LocalizedStrings.Connection_SOCKETCHANNEL_READ_RETURNED_EOF.toLocalizedString()); |
| } catch (Exception e) { |
| // ignore - shutting down |
| } |
| return; |
| } |
| |
| processNIOBuffer(); |
| if (!this.isReceiver |
| && (this.handshakeRead || this.handshakeCancelled)) { |
| if (logger.isDebugEnabled()) { |
| if (this.handshakeRead) { |
| logger.debug("{} handshake has been read {}", p2pReaderName(), this); |
| } else { |
| logger.debug("{} handshake has been cancelled {}", p2pReaderName(), this); |
| } |
| } |
| isHandShakeReader = true; |
| // Once we have read the handshake the reader can go away |
| break; |
| } |
| } |
| catch (CancelException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{} Terminated <{}> due to cancellation", p2pReaderName(), this, e); |
| } |
| this.readerShuttingDown = true; |
| try { |
| requestClose(LocalizedStrings.Connection_CACHECLOSED_IN_CHANNEL_READ_0.toLocalizedString(e)); |
| } catch (Exception ex) {} |
| return; |
| } |
| catch (ClosedChannelException e) { |
| this.readerShuttingDown = true; |
| try { |
| requestClose(LocalizedStrings.Connection_CLOSEDCHANNELEXCEPTION_IN_CHANNEL_READ_0.toLocalizedString(e)); |
| } catch (Exception ex) {} |
| return; |
| } |
| catch (IOException e) { |
| if (! isSocketClosed() |
| && !"Socket closed".equalsIgnoreCase(e.getMessage()) // needed for Solaris jdk 1.4.2_08 |
| ) { |
| if (logger.isDebugEnabled() && !isIgnorableIOException(e)) { |
| logger.debug("{} io exception for {}", p2pReaderName(), this, e); |
| } |
| if(e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{} received unexpected WSACancelBlockingCall exception, which may result in a hang", p2pReaderName()); |
| } |
| } |
| } |
| this.readerShuttingDown = true; |
| try { |
| requestClose(LocalizedStrings.Connection_IOEXCEPTION_IN_CHANNEL_READ_0.toLocalizedString(e)); |
| } catch (Exception ex) {} |
| return; |
| |
| } catch (Exception e) { |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); // bug 37101 |
| if (!stopped && ! isSocketClosed() ) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ, p2pReaderName()), e); |
| } |
| this.readerShuttingDown = true; |
| try { |
| requestClose(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ.toLocalizedString(e)); |
| } catch (Exception ex) {} |
| return; |
| } |
| } // for |
| } |
| finally { |
| if (!isHandShakeReader) { |
| synchronized(stateLock) { |
| connectionState = STATE_IDLE; |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("{} runNioReader terminated id={} from {}", p2pReaderName(), conduitIdStr, remoteAddr); |
| } |
| } |
| } |
| |
| /** |
| * checks to see if an exception should not be logged: i.e., "forcibly closed", |
| * "reset by peer", or "connection reset" |
| * */ |
| public static final boolean isIgnorableIOException(Exception e) { |
| if (e instanceof ClosedChannelException) { |
| return true; |
| } |
| |
| String msg = e.getMessage(); |
| if (msg == null) { |
| msg = e.toString(); |
| } |
| |
| msg = msg.toLowerCase(); |
| return (msg.indexOf("forcibly closed") >= 0) |
| || (msg.indexOf("reset by peer") >= 0) |
| || (msg.indexOf("connection reset") >= 0); |
| } |
| |
| private static boolean validMsgType(int msgType) { |
| return msgType == NORMAL_MSG_TYPE |
| || msgType == CHUNKED_MSG_TYPE |
| || msgType == END_CHUNKED_MSG_TYPE; |
| } |
| |
| private void closeAllMsgDestreamers() { |
| synchronized (this.destreamerLock) { |
| if (this.idleMsgDestreamer != null) { |
| this.idleMsgDestreamer.close(); |
| this.idleMsgDestreamer = null; |
| } |
| if (this.destreamerMap != null) { |
| Iterator it = this.destreamerMap.values().iterator(); |
| while (it.hasNext()) { |
| MsgDestreamer md = (MsgDestreamer)it.next(); |
| md.close(); |
| } |
| this.destreamerMap = null; |
| } |
| } |
| } |
| |
| MsgDestreamer obtainMsgDestreamer(short msgId, final Version v) { |
| synchronized (this.destreamerLock) { |
| if (this.destreamerMap == null) { |
| this.destreamerMap = new HashMap(); |
| } |
| Short key = new Short(msgId); |
| MsgDestreamer result = (MsgDestreamer)this.destreamerMap.get(key); |
| if (result == null) { |
| result = this.idleMsgDestreamer; |
| if (result != null) { |
| this.idleMsgDestreamer = null; |
| } else { |
| result = new MsgDestreamer(this.owner.getConduit().stats, |
| this.owner.owner.getCancelCriterion(), v); |
| } |
| result.setName(p2pReaderName() + " msgId=" + msgId); |
| this.destreamerMap.put(key, result); |
| } |
| return result; |
| } |
| } |
| void releaseMsgDestreamer(short msgId, MsgDestreamer md) { |
| Short key = new Short(msgId); |
| synchronized (this.destreamerLock) { |
| this.destreamerMap.remove(key); |
| if (this.idleMsgDestreamer == null) { |
| md.reset(); |
| this.idleMsgDestreamer = md; |
| } else { |
| md.close(); |
| } |
| } |
| } |
| |
| private void sendFailureReply(int rpId, String exMsg, Throwable ex, boolean directAck) { |
| ReplySender dm = null; |
| if(directAck) { |
| dm = new DirectReplySender(this); |
| } else if(rpId != 0) { |
| dm = this.owner.getDM(); |
| } |
| if (dm != null) { |
| ReplyMessage.send(getRemoteAddress(), rpId, new ReplyException(exMsg, ex), dm); |
| } |
| } |
| private void runOioReader() { |
| InputStream input = null; |
| try { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Socket is of type: {}", getSocket().getClass() ); |
| } |
| input = new BufferedInputStream(getSocket().getInputStream(), INITIAL_CAPACITY); |
| } |
| catch (IOException io) { |
| if (stopped || owner.getConduit().getCancelCriterion().cancelInProgress() != null) { |
| return; // bug 37520: exit run loop (and thread) |
| } |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNABLE_TO_GET_INPUT_STREAM), io); |
| stopped = true; |
| } |
| |
| if (!stopped) { |
| Assert.assertTrue(owner != null, LocalizedStrings.Connection_OWNER_SHOULD_NOT_BE_NULL.toLocalizedString()); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Starting {}", p2pReaderName()); |
| } |
| } |
| |
| byte[] lenbytes = new byte[MSG_HEADER_BYTES]; |
| |
| final ByteArrayDataInput dis = new ByteArrayDataInput(); |
| while (!stopped) { |
| try { |
| if (SystemFailure.getFailure() != null) { |
| // Allocate no objects here! |
| Socket s = this.socket; |
| if (s != null) { |
| try { |
| s.close(); |
| } |
| catch (IOException e) { |
| // don't care |
| } |
| } |
| SystemFailure.checkFailure(); // throws |
| } |
| if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) { |
| break; |
| } |
| int len = 0; |
| if (readFully(input, lenbytes, lenbytes.length) < 0) { |
| stopped = true; |
| continue; |
| } |
| // long recvNanos = DistributionStats.getStatTime(); |
| len = ((lenbytes[MSG_HEADER_SIZE_OFFSET]&0xff) * 0x1000000) + |
| ((lenbytes[MSG_HEADER_SIZE_OFFSET+1]&0xff) * 0x10000) + |
| ((lenbytes[MSG_HEADER_SIZE_OFFSET+2]&0xff) * 0x100) + |
| (lenbytes[MSG_HEADER_SIZE_OFFSET+3]&0xff); |
| /*byte msgHdrVersion =*/ calcHdrVersion(len); |
| len = calcMsgByteSize(len); |
| int msgType = lenbytes[MSG_HEADER_TYPE_OFFSET]; |
| short msgId = (short)((lenbytes[MSG_HEADER_ID_OFFSET]&0xff * 0x100) |
| + (lenbytes[MSG_HEADER_ID_OFFSET+1]&0xff)); |
| boolean myDirectAck = (msgType & DIRECT_ACK_BIT) != 0; |
| if (myDirectAck) { |
| msgType &= ~DIRECT_ACK_BIT; // clear the bit |
| } |
| // Following validation fixes bug 31145 |
| if (!validMsgType(msgType)) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNKNOWN_P2P_MESSAGE_TYPE_0, Integer.valueOf(msgType))); |
| this.readerShuttingDown = true; |
| requestClose(LocalizedStrings.Connection_UNKNOWN_P2P_MESSAGE_TYPE_0.toLocalizedString(Integer.valueOf(msgType))); |
| break; |
| } |
| if (logger.isTraceEnabled()) |
| logger.trace("{} reading {} bytes", conduitIdStr, len); |
| byte[] bytes = new byte[len]; |
| if (readFully(input, bytes, len) < 0) { |
| stopped = true; |
| continue; |
| } |
| boolean interrupted = Thread.interrupted(); |
| try { |
| if (this.handshakeRead) { |
| if (msgType == NORMAL_MSG_TYPE) { |
| //DMStats stats = this.owner.getConduit().stats; |
| //long start = DistributionStats.getStatTime(); |
| this.owner.getConduit().stats.incMessagesBeingReceived(true, len); |
| dis.initialize(bytes, this.remoteVersion); |
| DistributionMessage msg = null; |
| try { |
| ReplyProcessor21.initMessageRPId(); |
| long startSer = this.owner.getConduit().stats.startMsgDeserialization(); |
| msg = (DistributionMessage)InternalDataSerializer.readDSFID(dis); |
| this.owner.getConduit().stats.endMsgDeserialization(startSer); |
| if (dis.available() != 0) { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.Connection_MESSAGE_DESERIALIZATION_OF_0_DID_NOT_READ_1_BYTES, |
| new Object[] { msg, Integer.valueOf(dis.available())})); |
| } |
| //stats.incBatchCopyTime(start); |
| try { |
| //start = DistributionStats.getStatTime(); |
| if (!dispatchMessage(msg, len, myDirectAck)) { |
| continue; |
| } |
| //stats.incBatchSendTime(start); |
| } |
| catch (MemberShunnedException e) { |
| continue; |
| } |
| catch (Exception de) { |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de); // bug 37101 |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_ERROR_DISPATCHING_MESSAGE), de); |
| } |
| } |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable e) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| // In particular I want OutOfMem to be caught here |
| if (!myDirectAck) { |
| String reason = LocalizedStrings.Connection_ERROR_DESERIALIZING_MESSAGE.toLocalizedString(); |
| sendFailureReply(ReplyProcessor21.getMessageRPId(), reason, e, myDirectAck); |
| } |
| if(e instanceof CancelException) { |
| if (!(e instanceof CacheClosedException)) { |
| // Just log a message if we had trouble deserializing due to CacheClosedException; see bug 43543 |
| throw (CancelException) e; |
| } |
| } |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_ERROR_DESERIALIZING_MESSAGE), e); |
| //requestClose(); |
| //return; |
| } finally { |
| ReplyProcessor21.clearMessageRPId(); |
| } |
| } else if (msgType == CHUNKED_MSG_TYPE) { |
| MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion); |
| this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, len); |
| try { |
| md.addChunk(bytes); |
| } catch (IOException ex) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_FAILED_HANDLING_CHUNK_MESSAGE), ex); |
| } |
| } else /* (msgType == END_CHUNKED_MSG_TYPE) */ { |
| MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion); |
| this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, len); |
| try { |
| md.addChunk(bytes); |
| } catch (IOException ex) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_FAILED_HANDLING_END_CHUNK_MESSAGE), ex); |
| } |
| DistributionMessage msg = null; |
| int msgLength = 0; |
| String failureMsg = null; |
| Throwable failureEx = null; |
| int rpId = 0; |
| try { |
| msg = md.getMessage(); |
| } catch (ClassNotFoundException ex) { |
| this.owner.getConduit().stats.decMessagesBeingReceived(md.size()); |
| failureEx = ex; |
| rpId = md.getRPid(); |
| logger.warn(LocalizedMessage.create(LocalizedStrings.Connection_CLASSNOTFOUND_DESERIALIZING_MESSAGE_0, ex)); |
| } catch (IOException ex) { |
| this.owner.getConduit().stats.decMessagesBeingReceived(md.size()); |
| failureMsg = LocalizedStrings.Connection_IOEXCEPTION_DESERIALIZING_MESSAGE.toLocalizedString(); |
| failureEx = ex; |
| rpId = md.getRPid(); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_IOEXCEPTION_DESERIALIZING_MESSAGE), failureEx); |
| } catch (InterruptedException ex) { |
| Thread.currentThread().interrupt(); |
| throw ex; // caught by outer try |
| } |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable ex) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| this.owner.getConduit().stats.decMessagesBeingReceived(md.size()); |
| failureMsg = LocalizedStrings.Connection_UNEXPECTED_FAILURE_DESERIALIZING_MESSAGE.toLocalizedString(); |
| failureEx = ex; |
| rpId = md.getRPid(); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNEXPECTED_FAILURE_DESERIALIZING_MESSAGE), failureEx); |
| } finally { |
| msgLength = md.size(); |
| releaseMsgDestreamer(msgId, md); |
| } |
| if (msg != null) { |
| try { |
| if (!dispatchMessage(msg, msgLength, myDirectAck)) { |
| continue; |
| } |
| } |
| catch (MemberShunnedException e) { |
| continue; |
| } |
| catch (Exception de) { |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_ERROR_DISPATCHING_MESSAGE), de); |
| } |
| catch (ThreadDeath td) { |
| throw td; |
| } |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_THROWABLE_DISPATCHING_MESSAGE), t); |
| } |
| } else if (failureEx != null) { |
| sendFailureReply(rpId, failureMsg, failureEx, myDirectAck); |
| } |
| } |
| } else { |
| dis.initialize(bytes, null); |
| if (!this.isReceiver) { |
| this.replyCode = dis.readUnsignedByte(); |
| if (this.replyCode != REPLY_CODE_OK && this.replyCode != REPLY_CODE_OK_WITH_ASYNC_INFO) { |
| Integer replyCodeInteger = Integer.valueOf(this.replyCode); |
| String err = LocalizedStrings.Connection_UNKNOWN_HANDSHAKE_REPLY_CODE_0.toLocalizedString(replyCodeInteger); |
| |
| if (this.replyCode == 0) { // bug 37113 |
| if (logger.isDebugEnabled()) { |
| logger.debug("{} (peer probably departed ungracefully)", err); |
| } |
| } |
| else { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNKNOWN_HANDSHAKE_REPLY_CODE_0, replyCodeInteger)); |
| } |
| this.readerShuttingDown = true; |
| requestClose(err); |
| break; |
| } |
| if (this.replyCode == REPLY_CODE_OK_WITH_ASYNC_INFO) { |
| this.asyncDistributionTimeout = dis.readInt(); |
| this.asyncQueueTimeout = dis.readInt(); |
| this.asyncMaxQueueSize = (long)dis.readInt() * (1024*1024); |
| if (this.asyncDistributionTimeout != 0) { |
| logger.info(LocalizedMessage.create(LocalizedStrings. |
| Connection_0_ASYNC_CONFIGURATION_RECEIVED_1, |
| new Object[] {p2pReaderName(), |
| " asyncDistributionTimeout=" |
| + this.asyncDistributionTimeout |
| + " asyncQueueTimeout=" + this.asyncQueueTimeout |
| + " asyncMaxQueueSize=" |
| + (this.asyncMaxQueueSize / (1024*1024))})); |
| } |
| // read the product version ordinal for on-the-fly serialization |
| // transformations (for rolling upgrades) |
| this.remoteVersion = Version.readVersion(dis, true); |
| } |
| notifyHandshakeWaiter(true); |
| } else { |
| byte b = dis.readByte(); |
| if (b != 0) { |
| throw new IllegalStateException(LocalizedStrings.Connection_DETECTED_OLD_VERSION_PRE_5_0_1_OF_GEMFIRE_OR_NONGEMFIRE_DURING_HANDSHAKE_DUE_TO_INITIAL_BYTE_BEING_0.toLocalizedString(new Byte(b))); |
| } |
| byte handShakeByte = dis.readByte(); |
| if (handShakeByte != HANDSHAKE_VERSION) { |
| throw new IllegalStateException(LocalizedStrings.Connection_DETECTED_WRONG_VERSION_OF_GEMFIRE_PRODUCT_DURING_HANDSHAKE_EXPECTED_0_BUT_FOUND_1 |
| .toLocalizedString(new Object[]{new Byte(HANDSHAKE_VERSION), new Byte(handShakeByte)})); |
| } |
| InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis); |
| Stub stub = new Stub(remote.getIpAddress()/*fix for bug 33615*/, remote.getDirectChannelPort(), remote.getVmViewId()); |
| setRemoteAddr(remote, stub); |
| Thread.currentThread().setName(LocalizedStrings.Connection_P2P_MESSAGE_READER_FOR_0.toLocalizedString(this.remoteAddr, this.socket.getPort())); |
| this.sharedResource = dis.readBoolean(); |
| this.preserveOrder = dis.readBoolean(); |
| this.uniqueId = dis.readLong(); |
| // read the product version ordinal for on-the-fly serialization |
| // transformations (for rolling upgrades) |
| this.remoteVersion = Version.readVersion(dis, true); |
| int dominoNumber = 0; |
| if (this.remoteVersion == null || |
| (this.remoteVersion.compareTo(Version.GFE_80) >= 0) ) { |
| dominoNumber = dis.readInt(); |
| if (this.sharedResource) { |
| dominoNumber = 0; |
| } |
| dominoCount.set(dominoNumber); |
| // this.senderName = dis.readUTF(); |
| } |
| |
| if (!this.sharedResource) { |
| if (tipDomino()) { |
| logger.info(LocalizedMessage.create( |
| LocalizedStrings.Connection_THREAD_OWNED_RECEIVER_FORCING_ITSELF_TO_SEND_ON_THREAD_OWNED_SOCKETS)); |
| // bug #49565 - if domino count is >= 2 use shared resources. |
| // Also see DistributedCacheOperation#supportsDirectAck |
| } else { // if (dominoNumber < 2){ |
| ConnectionTable.threadWantsOwnResources(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("thread-owned receiver with domino count of {} will prefer sending on thread-owned sockets", dominoNumber); |
| } |
| // } else { |
| // ConnectionTable.threadWantsSharedResources(); |
| // logger.fine("thread-owned receiver with domino count of " + dominoNumber + " will prefer shared sockets"); |
| } |
| this.owner.owner.stats.incThreadOwnedReceivers(1L, dominoNumber); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("{} remoteId is {} {}", p2pReaderName(), this.remoteId, |
| (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : "")); |
| } |
| |
| String authInit = System |
| .getProperty(DistributionConfigImpl.SECURITY_SYSTEM_PREFIX |
| + DistributionConfig.SECURITY_PEER_AUTH_INIT_NAME); |
| boolean isSecure = authInit != null && authInit.length() != 0; |
| |
| if (isSecure) { |
| // ARB: wait till member authentication has been confirmed? |
| if (owner.getConduit().waitForMembershipCheck(this.remoteAddr)) { |
| sendOKHandshakeReply(); // fix for bug 33224 |
| notifyHandshakeWaiter(true); |
| } |
| else { |
| // ARB: throw exception?? |
| notifyHandshakeWaiter(false); |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.Connection_0_TIMED_OUT_DURING_A_MEMBERSHIP_CHECK, p2pReaderName())); |
| } |
| } |
| else { |
| sendOKHandshakeReply(); // fix for bug 33224 |
| notifyHandshakeWaiter(true); |
| } |
| } |
| if (!this.isReceiver |
| && (this.handshakeRead || this.handshakeCancelled)) { |
| if (logger.isDebugEnabled()) { |
| if (this.handshakeRead) { |
| logger.debug("{} handshake has been read {}", p2pReaderName(), this); |
| } else { |
| logger.debug("{} handshake has been cancelled {}", p2pReaderName(), this); |
| } |
| } |
| // Once we have read the handshake the reader can go away |
| break; |
| } |
| continue; |
| } |
| } |
| catch (InterruptedException e) { |
| interrupted = true; |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_0_STRAY_INTERRUPT_READING_MESSAGE, p2pReaderName()), e); |
| continue; |
| } |
| catch (Exception ioe) { |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ioe); // bug 37101 |
| if (!stopped) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_0_ERROR_READING_MESSAGE, p2pReaderName()), ioe); |
| } |
| continue; |
| } |
| finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| catch (CancelException e) { |
| if (logger.isDebugEnabled()) { |
| String ccMsg = p2pReaderName() + " Cancelled: " + this; |
| if (e.getMessage() != null) { |
| ccMsg += ": " + e.getMessage(); |
| } |
| logger.debug(ccMsg); |
| } |
| this.readerShuttingDown = true; |
| try { |
| requestClose(LocalizedStrings.Connection_CACHECLOSED_IN_CHANNEL_READ_0.toLocalizedString(e)); |
| } catch (Exception ex) {} |
| this.stopped = true; |
| } |
| catch (IOException io) { |
| boolean closed = isSocketClosed() |
| || "Socket closed".equalsIgnoreCase(io.getMessage()); // needed for Solaris jdk 1.4.2_08 |
| if (!closed) { |
| if (logger.isDebugEnabled() && !isIgnorableIOException(io)) { |
| logger.debug("{} io exception for {}", p2pReaderName(), this, io); |
| } |
| } |
| this.readerShuttingDown = true; |
| try { |
| requestClose(LocalizedStrings.Connection_IOEXCEPTION_RECEIVED_0.toLocalizedString(io)); |
| } catch (Exception ex) {} |
| |
| if (closed) { |
| stopped = true; |
| } |
| else { |
| // sleep a bit to avoid a hot error loop |
| try { Thread.sleep(1000); } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) { |
| return; |
| } |
| break; |
| } |
| } |
| } // IOException |
| catch (Exception e) { |
| if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) { |
| return; // bug 37101 |
| } |
| if (!stopped && !(e instanceof InterruptedException) ) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_0_EXCEPTION_RECEIVED, p2pReaderName()), e); |
| } |
| if (isSocketClosed()) { |
| stopped = true; |
| } |
| else { |
| this.readerShuttingDown = true; |
| try { requestClose(LocalizedStrings.Connection_0_EXCEPTION_RECEIVED.toLocalizedString(e)); } catch (Exception ex) {} |
| |
| // sleep a bit to avoid a hot error loop |
| try { Thread.sleep(1000); } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| break; |
| } |
| } |
| } |
| } |
| } |
| @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE") |
| final int readFully(InputStream input, byte[] buffer, int len) throws IOException { |
| int bytesSoFar = 0; |
| while (bytesSoFar < len) { |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); |
| try { |
| synchronized(stateLock) { |
| connectionState = STATE_READING; |
| } |
| int bytesThisTime = input.read(buffer, bytesSoFar, len-bytesSoFar); |
| if (bytesThisTime < 0) { |
| this.readerShuttingDown = true; |
| try { |
| requestClose(LocalizedStrings.Connection_STREAM_READ_RETURNED_NONPOSITIVE_LENGTH.toLocalizedString()); |
| } catch (Exception ex) { } |
| return -1; |
| } |
| bytesSoFar += bytesThisTime; |
| } |
| catch (InterruptedIOException io) { |
| // try { Thread.sleep(10); } |
| // catch (InterruptedException ie) { |
| // Thread.currentThread().interrupt(); |
| // } |
| |
| // Current thread has been interrupted. Regard it similar to an EOF |
| this.readerShuttingDown = true; |
| try { |
| requestClose(LocalizedStrings.Connection_CURRENT_THREAD_INTERRUPTED.toLocalizedString()); |
| } catch (Exception ex) { } |
| Thread.currentThread().interrupt(); |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); |
| } |
| finally { |
| synchronized(stateLock) { |
| connectionState = STATE_IDLE; |
| } |
| } |
| } // while |
| return len; |
| } |
| |
| /** |
| * sends a serialized message to the other end of this connection. This |
| is used by the DirectChannel in GemFire when the message is going to |
| be sent to multiple recipients. |
| * @throws ConnectionException if the conduit has stopped |
| */ |
| public void sendPreserialized(ByteBuffer buffer, |
| boolean cacheContentChanges, DistributionMessage msg) |
| throws IOException, ConnectionException |
| { |
| if (!connected) { |
| throw new ConnectionException(LocalizedStrings.Connection_NOT_CONNECTED_TO_0.toLocalizedString(this.remoteId)); |
| } |
| if (this.batchFlusher != null) { |
| batchSend(buffer); |
| return; |
| } |
| final boolean origSocketInUse = this.socketInUse; |
| byte originalState = -1; |
| synchronized (stateLock) { |
| originalState = this.connectionState;; |
| this.connectionState = STATE_SENDING; |
| } |
| this.socketInUse = true; |
| try { |
| if (useNIO()) { |
| SocketChannel channel = getSocket().getChannel(); |
| nioWriteFully(channel, buffer, false, msg); |
| } else { |
| if (buffer.hasArray()) { |
| this.output.write(buffer.array(), buffer.arrayOffset(), |
| buffer.limit() - buffer.position()); |
| } else { |
| byte[] bytesToWrite = getBytesToWrite(buffer); |
| synchronized(outLock) { |
| try { |
| // this.writerThread = Thread.currentThread(); |
| this.output.write(bytesToWrite); |
| this.output.flush(); |
| } |
| finally { |
| // this.writerThread = null; |
| } |
| } |
| } |
| } |
| if (cacheContentChanges) { |
| messagesSent++; |
| } |
| } finally { |
| accessed(); |
| this.socketInUse = origSocketInUse; |
| synchronized (stateLock) { |
| this.connectionState = originalState; |
| } |
| } |
| } |
| /** |
| * If <code>use</code> is true then "claim" the connection for our use. |
| * If <code>use</code> is false then "release" the connection. |
| * Fixes bug 37657. |
| * @return true if connection was already in use at time of call; |
| * false if not. |
| */ |
| public boolean setInUse(boolean use, long startTime, long ackWaitThreshold, long ackSAThreshold, List connectionGroup) { |
| // just do the following; EVEN if the connection has been closed |
| final boolean origSocketInUse = this.socketInUse; |
| synchronized(this) { |
| if (use && (ackWaitThreshold > 0 || ackSAThreshold > 0)) { |
| // set times that events should be triggered |
| this.transmissionStartTime = startTime; |
| this.ackWaitTimeout = ackWaitThreshold; |
| this.ackSATimeout = ackSAThreshold; |
| this.ackConnectionGroup = connectionGroup; |
| this.ackThreadName = Thread.currentThread().getName(); |
| } |
| else { |
| this.ackWaitTimeout = 0; |
| this.ackSATimeout = 0; |
| this.ackConnectionGroup = null; |
| this.ackThreadName = null; |
| } |
| synchronized (this.stateLock) { |
| this.connectionState = STATE_IDLE; |
| } |
| this.socketInUse = use; |
| } |
| if (!use) { |
| accessed(); |
| } |
| return origSocketInUse; |
| } |
| |
| |
| /** ensure that a task is running to monitor transmission and reading of acks */ |
| public synchronized void scheduleAckTimeouts() { |
| if (ackTimeoutTask == null) { |
| final long msAW = this.owner.getDM().getConfig().getAckWaitThreshold() * 1000; |
| final long msSA = this.owner.getDM().getConfig().getAckSevereAlertThreshold() * 1000; |
| ackTimeoutTask = new SystemTimer.SystemTimerTask() { |
| @Override |
| public void run2() { |
| if (owner.isClosed()) { |
| return; |
| } |
| byte connState = -1; |
| synchronized (stateLock) { |
| connState = connectionState; |
| } |
| boolean sentAlert = false; |
| synchronized(Connection.this) { |
| if (socketInUse) { |
| switch (connState) { |
| case Connection.STATE_IDLE: |
| break; |
| case Connection.STATE_SENDING: |
| sentAlert = doSevereAlertProcessing(); |
| break; |
| case Connection.STATE_POST_SENDING: |
| break; |
| case Connection.STATE_READING_ACK: |
| sentAlert = doSevereAlertProcessing(); |
| break; |
| case Connection.STATE_RECEIVED_ACK: |
| break; |
| default: |
| } |
| } |
| } |
| if (sentAlert) { |
| // since transmission and ack-receipt are performed serially, we don't |
| // want to complain about all receivers out just because one was slow. We therefore reset |
| // the time stamps and give others more time |
| for (Iterator it=ackConnectionGroup.iterator(); it.hasNext(); ) { |
| Connection con = (Connection)it.next(); |
| if (con != Connection.this) { |
| con.transmissionStartTime += con.ackSATimeout; |
| } |
| } |
| } |
| } |
| }; |
| |
| synchronized(owner) { |
| SystemTimer timer = owner.getIdleConnTimer(); |
| if (timer != null) { |
| if (msSA > 0) { |
| timer.scheduleAtFixedRate(ackTimeoutTask, msAW, Math.min(msAW, msSA)); |
| } |
| else { |
| timer.schedule(ackTimeoutTask, msAW); |
| } |
| } |
| } |
| } |
| } |
| |
| /** ack-wait-threshold and ack-severe-alert-threshold processing */ |
| protected boolean doSevereAlertProcessing() { |
| long now = System.currentTimeMillis(); |
| if (ackSATimeout > 0 |
| && (transmissionStartTime + ackWaitTimeout + ackSATimeout) <= now) { |
| logger.fatal(LocalizedMessage.create( |
| LocalizedStrings.Connection_0_SECONDS_HAVE_ELAPSED_WAITING_FOR_A_RESPONSE_FROM_1_FOR_THREAD_2, |
| new Object[] {Long.valueOf((ackWaitTimeout+ackSATimeout)/1000), getRemoteAddress(), ackThreadName})); |
| // turn off subsequent checks by setting the timeout to zero, then boot the member |
| ackSATimeout = 0; |
| return true; |
| } |
| else if (!ackTimedOut |
| && (0 < ackWaitTimeout) |
| && (transmissionStartTime + ackWaitTimeout) <= now) { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.Connection_0_SECONDS_HAVE_ELAPSED_WAITING_FOR_A_RESPONSE_FROM_1_FOR_THREAD_2, |
| new Object[] {Long.valueOf(ackWaitTimeout/1000), getRemoteAddress(), ackThreadName})); |
| ackTimedOut = true; |
| |
| final StringId state = (connectionState == Connection.STATE_SENDING)? |
| LocalizedStrings.Connection_TRANSMIT_ACKWAITTHRESHOLD : LocalizedStrings.Connection_RECEIVE_ACKWAITTHRESHOLD; |
| if (ackSATimeout > 0) { |
| this.owner.getDM().getMembershipManager() |
| .suspectMembers(Collections.singleton(getRemoteAddress()), state.toLocalizedString()); |
| } |
| } |
| return false; |
| } |
| |
| static private byte[] getBytesToWrite(ByteBuffer buffer) { |
| byte[] bytesToWrite = new byte[buffer.limit()]; |
| buffer.get(bytesToWrite); |
| return bytesToWrite; |
| } |
| |
| // private String socketInfo() { |
| // return (" socket: " + getSocket().getLocalAddress() + ":" + getSocket().getLocalPort() + " -> " + |
| // getSocket().getInetAddress() + ":" + getSocket().getPort() + " connection = " + System.identityHashCode(this)); |
| // |
| // } |
| |
| private final boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, |
| boolean force) throws ConnectionException { |
| final DMStats stats = this.owner.getConduit().stats; |
| long start = DistributionStats.getStatTime(); |
| try { |
| ConflationKey ck = null; |
| if (msg != null) { |
| ck = msg.getConflationKey(); |
| } |
| Object objToQueue = null; |
| // if we can conflate delay the copy to see if we can reuse |
| // an already allocated buffer. |
| final int newBytes = buffer.remaining(); |
| final int origBufferPos = buffer.position(); // to fix bug 34832 |
| if (ck == null || !ck.allowsConflation()) { |
| // do this outside of sync for multi thread perf |
| ByteBuffer newbb = ByteBuffer.allocate(newBytes); |
| newbb.put(buffer); |
| newbb.flip(); |
| objToQueue = newbb; |
| } |
| synchronized (this.outgoingQueue) { |
| if (this.disconnectRequested) { |
| buffer.position(origBufferPos); |
| // we have given up so just drop this message. |
| throw new ConnectionException(LocalizedStrings.Connection_FORCED_DISCONNECT_SENT_TO_0.toLocalizedString(this.remoteId)); |
| } |
| if (!force && !this.asyncQueuingInProgress) { |
| // reset buffer since we will be sending it. This fixes bug 34832 |
| buffer.position(origBufferPos); |
| // the pusher emptied the queue so don't add since we are not forced to. |
| return false; |
| } |
| boolean didConflation = false; |
| if (ck != null) { |
| if (ck.allowsConflation()) { |
| objToQueue = ck; |
| Object oldValue = this.conflatedKeys.put(ck, ck); |
| if (oldValue != null) { |
| ConflationKey oldck = (ConflationKey)oldValue; |
| ByteBuffer oldBuffer = oldck.getBuffer(); |
| // need to always do this to allow old buffer to be gc'd |
| oldck.setBuffer(null); |
| // remove the conflated key from current spot in queue |
| // Note we no longer remove from the queue because the search |
| // can be expensive on large queues. Instead we just wait for |
| // the queue removal code to find the oldck and ignore it since |
| // its buffer is null |
| // We do a quick check of the last thing in the queue |
| // and if it has the same identity of our last thing then |
| // remove it |
| if (this.outgoingQueue.getLast() == oldck) { |
| this.outgoingQueue.removeLast(); |
| } |
| int oldBytes = oldBuffer.remaining(); |
| this.queuedBytes -= oldBytes; |
| stats.incAsyncQueueSize(-oldBytes); |
| stats.incAsyncConflatedMsgs(); |
| didConflation = true; |
| if (oldBuffer.capacity() >= newBytes) { |
| // copy new buffer into oldBuffer |
| oldBuffer.clear(); |
| oldBuffer.put(buffer); |
| oldBuffer.flip(); |
| ck.setBuffer(oldBuffer); |
| } else { |
| // old buffer was not large enough |
| oldBuffer = null; |
| ByteBuffer newbb = ByteBuffer.allocate(newBytes); |
| newbb.put(buffer); |
| newbb.flip(); |
| ck.setBuffer(newbb); |
| } |
| } else { |
| // no old buffer so need to allocate one |
| ByteBuffer newbb = ByteBuffer.allocate(newBytes); |
| newbb.put(buffer); |
| newbb.flip(); |
| ck.setBuffer(newbb); |
| } |
| } else { |
| // just forget about having a conflatable operation |
| /*Object removedVal =*/ this.conflatedKeys.remove(ck); |
| } |
| } |
| { |
| long newQueueSize = newBytes + this.queuedBytes; |
| if (newQueueSize > this.asyncMaxQueueSize) { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.Connection_QUEUED_BYTES_0_EXCEEDS_MAX_OF_1_ASKING_SLOW_RECEIVER_2_TO_DISCONNECT, |
| new Object[] {newQueueSize, this.asyncMaxQueueSize, this.remoteAddr })); |
| stats.incAsyncQueueSizeExceeded(1); |
| disconnectSlowReceiver(); |
| // reset buffer since we will be sending it |
| buffer.position(origBufferPos); |
| return false; |
| } |
| } |
| this.outgoingQueue.addLast(objToQueue); |
| this.queuedBytes += newBytes; |
| stats.incAsyncQueueSize(newBytes); |
| if (!didConflation) { |
| stats.incAsyncQueuedMsgs(); |
| } |
| return true; |
| } |
| } finally { |
| if (DistributionStats.enableClockStats) { |
| stats.incAsyncQueueAddTime(DistributionStats.getStatTime() - start); |
| } |
| } |
| } |
| /** |
| * Return true if it was able to handle a block write of the given buffer. |
| * Return false if it is still the caller is still responsible for writing it. |
| * @throws ConnectionException if the conduit has stopped |
| */ |
| private final boolean handleBlockedWrite(ByteBuffer buffer, |
| DistributionMessage msg) throws ConnectionException |
| { |
| if (!addToQueue(buffer, msg, true)) { |
| return false; |
| } else { |
| startNioPusher(); |
| return true; |
| } |
| } |
| |
| private final Object nioPusherSync = new Object(); |
| |
| private void startNioPusher() { |
| synchronized (this.nioPusherSync) { |
| while (this.pusherThread != null) { |
| // wait for previous pusher thread to exit |
| boolean interrupted = Thread.interrupted(); |
| try { |
| this.nioPusherSync.wait(); // spurious wakeup ok |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex); |
| } |
| finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| this.asyncQueuingInProgress = true; |
| ThreadGroup group = |
| LoggingThreadGroup.createThreadGroup("P2P Writer Threads", logger); |
| this.pusherThread = new Thread(group, new Runnable() { |
| public void run() { |
| Connection.this.runNioPusher(); |
| } |
| }, "P2P async pusher to " + this.remoteAddr); |
| this.pusherThread.setDaemon(true); |
| } // synchronized |
| this.pusherThread.start(); |
| } |
| |
| private final ByteBuffer takeFromOutgoingQueue() throws InterruptedException { |
| ByteBuffer result = null; |
| final DMStats stats = this.owner.getConduit().stats; |
| long start = DistributionStats.getStatTime(); |
| try { |
| synchronized (this.outgoingQueue) { |
| if (this.disconnectRequested) { |
| // don't bother with anymore work since we are done |
| this.asyncQueuingInProgress = false; |
| this.outgoingQueue.notifyAll(); |
| return null; |
| } |
| //Object o = this.outgoingQueue.poll(); |
| do { |
| if (this.outgoingQueue.isEmpty()) { |
| break; |
| } |
| Object o = this.outgoingQueue.removeFirst(); |
| if (o == null) { |
| break; |
| } |
| if (o instanceof ConflationKey) { |
| result = ((ConflationKey)o).getBuffer(); |
| if (result != null) { |
| this.conflatedKeys.remove(o); |
| } else { |
| // if result is null then this same key will be found later in the |
| // queue so we just need to skip this entry |
| continue; |
| } |
| } else { |
| result = (ByteBuffer)o; |
| } |
| int newBytes = result.remaining(); |
| this.queuedBytes -= newBytes; |
| stats.incAsyncQueueSize(-newBytes); |
| stats.incAsyncDequeuedMsgs(); |
| } while (result == null); |
| if (result == null) { |
| this.asyncQueuingInProgress = false; |
| this.outgoingQueue.notifyAll(); |
| } |
| } |
| return result; |
| } finally { |
| if (DistributionStats.enableClockStats) { |
| stats.incAsyncQueueRemoveTime(DistributionStats.getStatTime() - start); |
| } |
| } |
| } |
| |
| private boolean disconnectRequested = false; |
| |
| |
| /** |
| * @since 4.2.2 |
| */ |
| private void disconnectSlowReceiver() { |
| synchronized (this.outgoingQueue) { |
| if (this.disconnectRequested) { |
| // only ask once |
| return; |
| } |
| this.disconnectRequested = true; |
| } |
| DM dm = this.owner.getDM(); |
| if (dm == null) { |
| this.owner.removeEndpoint(this.remoteId, LocalizedStrings.Connection_NO_DISTRIBUTION_MANAGER.toLocalizedString()); |
| return; |
| } |
| dm.getMembershipManager().requestMemberRemoval(this.remoteAddr, |
| LocalizedStrings.Connection_DISCONNECTED_AS_A_SLOWRECEIVER.toLocalizedString()); |
| // Ok, we sent the message, the coordinator should kick the member out |
| // immediately and inform this process with a new view. |
| // Let's wait |
| // for that to happen and if it doesn't in X seconds |
| // then remove the endpoint. |
| final int FORCE_TIMEOUT = 3000; |
| while (dm.getOtherDistributionManagerIds().contains(this.remoteAddr)) { |
| try { |
| Thread.sleep(50); |
| } |
| catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ie); |
| return; |
| } |
| } |
| this.owner.removeEndpoint(this.remoteId, |
| LocalizedStrings.Connection_FORCE_DISCONNECT_TIMED_OUT.toLocalizedString()); |
| if (dm.getOtherDistributionManagerIds().contains(this.remoteAddr)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Force disconnect timed out after waiting {} seconds", (FORCE_TIMEOUT/1000)); |
| } |
| return; |
| } |
| } |
| |
| /** |
| * have the pusher thread check for queue overflow |
| * and for idle time exceeded |
| */ |
| protected void runNioPusher() { |
| try { |
| final DMStats stats = this.owner.getConduit().stats; |
| final long threadStart = stats.startAsyncThread(); |
| try { |
| stats.incAsyncQueues(1); |
| stats.incAsyncThreads(1); |
| |
| try { |
| int flushId = 0; |
| while (this.asyncQueuingInProgress && this.connected) { |
| if (SystemFailure.getFailure() != null) { |
| // Allocate no objects here! |
| Socket s = this.socket; |
| if (s != null) { |
| try { |
| s.close(); |
| } |
| catch (IOException e) { |
| // don't care |
| } |
| } |
| SystemFailure.checkFailure(); // throws |
| } |
| if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) { |
| break; |
| } |
| flushId++; |
| long flushStart = stats.startAsyncQueueFlush(); |
| try { |
| long curQueuedBytes = this.queuedBytes; |
| if (curQueuedBytes > this.asyncMaxQueueSize) { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.Connection_QUEUED_BYTES_0_EXCEEDS_MAX_OF_1_ASKING_SLOW_RECEIVER_2_TO_DISCONNECT, |
| new Object[]{ curQueuedBytes, this.asyncMaxQueueSize, this.remoteAddr})); |
| stats.incAsyncQueueSizeExceeded(1); |
| disconnectSlowReceiver(); |
| return; |
| } |
| SocketChannel channel = getSocket().getChannel(); |
| ByteBuffer bb = takeFromOutgoingQueue(); |
| if (bb == null) { |
| if (logger.isDebugEnabled() && flushId == 1) { |
| logger.debug("P2P pusher found empty queue"); |
| } |
| return; |
| } |
| nioWriteFully(channel, bb, true, null); |
| // We should not add messagesSent here according to Bruce. |
| // The counts are increased elsewhere. |
| // messagesSent++; |
| accessed(); |
| } finally { |
| stats.endAsyncQueueFlush(flushStart); |
| } |
| } // while |
| } finally { |
| // need to force this to false before doing the requestClose calls |
| synchronized (this.outgoingQueue) { |
| this.asyncQueuingInProgress = false; |
| this.outgoingQueue.notifyAll(); |
| } |
| } |
| } catch (InterruptedException ex) { |
| // someone wants us to stop. |
| // No need to set interrupt bit, we're quitting. |
| // No need to throw an error, we're quitting. |
| } catch (IOException ex) { |
| final String err = LocalizedStrings.Connection_P2P_PUSHER_IO_EXCEPTION_FOR_0.toLocalizedString(this); |
| if (! isSocketClosed() ) { |
| if (logger.isDebugEnabled() && !isIgnorableIOException(ex)) { |
| logger.debug(err, ex); |
| } |
| } |
| try { requestClose(err + ": " + ex); } catch (Exception ignore) {} |
| } |
| catch (CancelException ex) { // bug 37367 |
| final String err = LocalizedStrings.Connection_P2P_PUSHER_0_CAUGHT_CACHECLOSEDEXCEPTION_1.toLocalizedString(new Object[] {this, ex}); |
| logger.debug(err); |
| try { requestClose(err); } catch (Exception ignore) {} |
| return; |
| } |
| catch (Exception ex) { |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex); // bug 37101 |
| if (! isSocketClosed() ) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_P2P_PUSHER_EXCEPTION_0, ex), ex); |
| } |
| try { requestClose(LocalizedStrings.Connection_P2P_PUSHER_EXCEPTION_0.toLocalizedString(ex)); } catch (Exception ignore) {} |
| } finally { |
| stats.incAsyncQueueSize(-this.queuedBytes); |
| this.queuedBytes = 0; |
| stats.endAsyncThread(threadStart); |
| stats.incAsyncThreads(-1); |
| stats.incAsyncQueues(-1); |
| if (logger.isDebugEnabled()) { |
| logger.debug("runNioPusher terminated id={} from {}/{}", conduitIdStr, remoteId, remoteAddr); |
| } |
| } |
| } finally { |
| synchronized (this.nioPusherSync) { |
| this.pusherThread = null; |
| this.nioPusherSync.notify(); |
| } |
| } |
| } |
| |
| /** |
| * Return false if socket writes to be done async/nonblocking |
| * Return true if socket writes to be done sync/blocking |
| */ |
| private final boolean useSyncWrites(boolean forceAsync) { |
| if (forceAsync) { |
| return false; |
| } |
| // only use sync writes if: |
| // we are already queuing |
| if (this.asyncQueuingInProgress) { |
| // it will just tack this msg onto the outgoing queue |
| return true; |
| } |
| // or we are a receiver |
| if (this.isReceiver) { |
| return true; |
| } |
| // or we are an unordered connection |
| if (!this.preserveOrder) { |
| return true; |
| } |
| // or the receiver does not allow queuing |
| if (this.asyncDistributionTimeout == 0) { |
| return true; |
| } |
| // OTHERWISE return false and let caller send async |
| return false; |
| } |
| |
| /** |
| * If true then act as if the socket buffer is full and start async queuing |
| */ |
| public static volatile boolean FORCE_ASYNC_QUEUE = false; |
| |
| static private final int MAX_WAIT_TIME = (1<<5); // ms (must be a power of 2) |
| |
| private final void writeAsync(SocketChannel channel, |
| ByteBuffer buffer, boolean forceAsync, DistributionMessage p_msg, |
| final DMStats stats) throws IOException { |
| DistributionMessage msg = p_msg; |
| // async/non-blocking |
| boolean socketWriteStarted = false; |
| long startSocketWrite = 0; |
| int retries = 0; |
| int totalAmtWritten = 0; |
| try { |
| synchronized (this.outLock) { |
| if (!forceAsync) { |
| // check one more time while holding outLock in case a pusher was created |
| if (this.asyncQueuingInProgress) { |
| if (addToQueue(buffer, msg, false)) { |
| return; |
| } |
| // fall through |
| } |
| } |
| socketWriteStarted = true; |
| startSocketWrite = stats.startSocketWrite(false); |
| long now = System.currentTimeMillis(); |
| int waitTime = 1; |
| long distributionTimeoutTarget = 0; |
| // if asyncDistributionTimeout == 1 then we want to start queuing |
| // as soon as we do a non blocking socket write that returns 0 |
| if (this.asyncDistributionTimeout != 1) { |
| distributionTimeoutTarget = now + this.asyncDistributionTimeout; |
| } |
| long queueTimeoutTarget = now + this.asyncQueueTimeout; |
| channel.configureBlocking(false); |
| try { |
| do { |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); |
| retries++; |
| int amtWritten; |
| if (FORCE_ASYNC_QUEUE) { |
| amtWritten = 0; |
| } else { |
| amtWritten = channel.write(buffer); |
| } |
| if (amtWritten == 0) { |
| now = System.currentTimeMillis(); |
| long timeoutTarget; |
| if (!forceAsync) { |
| if (now > distributionTimeoutTarget) { |
| if (logger.isDebugEnabled()) { |
| if (distributionTimeoutTarget == 0) { |
| logger.debug("Starting async pusher to handle async queue because distribution-timeout is 1 and the last socket write would have blocked."); |
| } else { |
| long blockedMs = now - distributionTimeoutTarget; |
| blockedMs += this.asyncDistributionTimeout; |
| logger.debug("Blocked for {}ms which is longer than the max of {}ms so starting async pusher to handle async queue.", |
| blockedMs, this.asyncDistributionTimeout); |
| } |
| } |
| stats.incAsyncDistributionTimeoutExceeded(); |
| if (totalAmtWritten > 0) { |
| // we have written part of the msg to the socket buffer |
| // and we are going to queue the remainder. |
| // We set msg to null so that will not make |
| // the partial msg a candidate for conflation. |
| msg = null; |
| } |
| if (handleBlockedWrite(buffer, msg)) { |
| return; |
| } |
| } |
| timeoutTarget = distributionTimeoutTarget; |
| } else { |
| boolean disconnectNeeded = false; |
| long curQueuedBytes = this.queuedBytes; |
| if (curQueuedBytes > this.asyncMaxQueueSize) { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.Connection_QUEUED_BYTES_0_EXCEEDS_MAX_OF_1_ASKING_SLOW_RECEIVER_2_TO_DISCONNECT, |
| new Object[]{ Long.valueOf(curQueuedBytes), Long.valueOf(this.asyncMaxQueueSize), this.remoteAddr})); |
| stats.incAsyncQueueSizeExceeded(1); |
| disconnectNeeded = true; |
| } |
| if (now > queueTimeoutTarget) { |
| // we have waited long enough |
| // the pusher has been idle too long! |
| long blockedMs = now - queueTimeoutTarget; |
| blockedMs += this.asyncQueueTimeout; |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.Connection_BLOCKED_FOR_0_MS_WHICH_IS_LONGER_THAN_THE_MAX_OF_1_MS_ASKING_SLOW_RECEIVER_2_TO_DISCONNECT, |
| new Object[] {Long.valueOf(blockedMs), Integer.valueOf(this.asyncQueueTimeout), this.remoteAddr})); |
| stats.incAsyncQueueTimeouts(1); |
| disconnectNeeded = true; |
| } |
| if (disconnectNeeded) { |
| disconnectSlowReceiver(); |
| synchronized (this.outgoingQueue) { |
| this.asyncQueuingInProgress = false; |
| this.outgoingQueue.notifyAll(); // for bug 42330 |
| } |
| return; |
| } |
| timeoutTarget = queueTimeoutTarget; |
| } |
| { |
| long msToWait = waitTime; |
| long msRemaining = timeoutTarget - now; |
| if (msRemaining > 0) { |
| msRemaining /= 2; |
| } |
| if (msRemaining < msToWait) { |
| msToWait = msRemaining; |
| } |
| if (msToWait <= 0) { |
| Thread.yield(); |
| } else { |
| boolean interrupted = Thread.interrupted();; |
| try { |
| Thread.sleep(msToWait); |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex); |
| } |
| finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| if (waitTime < MAX_WAIT_TIME) { |
| // double it since it is not yet the max |
| waitTime <<= 1; |
| } |
| } // amtWritten == 0 |
| else { |
| totalAmtWritten += amtWritten; |
| // reset queueTimeoutTarget since we made some progress |
| queueTimeoutTarget = System.currentTimeMillis() + this.asyncQueueTimeout; |
| waitTime = 1; |
| } |
| } while (buffer.remaining() > 0); |
| } finally { |
| channel.configureBlocking(true); |
| } |
| } |
| } finally { |
| if (socketWriteStarted) { |
| if (retries > 0) { |
| retries--; |
| } |
| stats.endSocketWrite(false, startSocketWrite, totalAmtWritten, retries); |
| } |
| } |
| } |
| |
| /** nioWriteFully implements a blocking write on a channel that is in |
| * non-blocking mode. |
| * @param forceAsync true if we need to force a blocking async write. |
| * @throws ConnectionException if the conduit has stopped |
| */ |
| protected final void nioWriteFully(SocketChannel channel, |
| ByteBuffer buffer, |
| boolean forceAsync, |
| DistributionMessage msg) |
| throws IOException, ConnectionException |
| { |
| final DMStats stats = this.owner.getConduit().stats; |
| if (!this.sharedResource) { |
| stats.incTOSentMsg(); |
| } |
| if (useSyncWrites(forceAsync)) { |
| if (this.asyncQueuingInProgress) { |
| if (addToQueue(buffer, msg, false)) { |
| return; |
| } |
| // fall through |
| } |
| long startLock = stats.startSocketLock(); |
| synchronized (this.outLock) { |
| stats.endSocketLock(startLock); |
| if (this.asyncQueuingInProgress) { |
| if (addToQueue(buffer, msg, false)) { |
| return; |
| } |
| // fall through |
| } |
| do { |
| int amtWritten = 0; |
| long start = stats.startSocketWrite(true); |
| try { |
| // this.writerThread = Thread.currentThread(); |
| amtWritten = channel.write(buffer); |
| } |
| finally { |
| stats.endSocketWrite(true, start, amtWritten, 0); |
| // this.writerThread = null; |
| } |
| } while (buffer.remaining() > 0); |
| } // synchronized |
| } |
| else { |
| writeAsync(channel, buffer, forceAsync, msg, stats); |
| } |
| } |
| |
| /** gets the buffer for receiving message length bytes */ |
| protected ByteBuffer getNIOBuffer() { |
| final DMStats stats = this.owner.getConduit().stats; |
| if (nioInputBuffer == null) { |
| int allocSize = this.recvBufferSize; |
| if (allocSize == -1) { |
| allocSize = this.owner.getConduit().tcpBufferSize; |
| } |
| nioInputBuffer = Buffers.acquireReceiveBuffer(allocSize, stats); |
| } |
| return nioInputBuffer; |
| } |
| |
| /** |
| * stateLock is used to synchronize state changes. |
| */ |
| protected Object stateLock = new Object(); |
| |
| /** for timeout processing, this is the current state of the connection */ |
| protected byte connectionState; |
| |
| /*~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~*/ |
| /** the connection is idle, but may be in use */ |
| protected static final byte STATE_IDLE = 0; |
| /** the connection is in use and is transmitting data */ |
| protected static final byte STATE_SENDING = 1; |
| /** the connection is in use and is done transmitting */ |
| protected static final byte STATE_POST_SENDING = 2; |
| /** the connection is in use and is reading a direct-ack */ |
| protected static final byte STATE_READING_ACK = 3; |
| /** the connection is in use and has finished reading a direct-ack */ |
| protected static final byte STATE_RECEIVED_ACK = 4; |
| /** the connection is in use and is reading a message */ |
| protected static final byte STATE_READING = 5; |
| |
| protected static final String[] STATE_NAMES = new String[] { |
| "idle", "sending", "post_sending", "reading_ack", "received_ack", "reading" }; |
| /*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/ |
| |
| /** set to true if we exceeded the ack-wait-threshold waiting for a response */ |
| protected volatile boolean ackTimedOut; |
| |
| private static int ACK_SIZE = 1; |
| private static byte ACK_BYTE = 37; |
| |
| /** |
| * @param msToWait number of milliseconds to wait for an ack. |
| * If 0 then wait forever. |
| * @param msInterval interval between checks |
| * @throws SocketTimeoutException if msToWait expires. |
| * @throws ConnectionException if ack is not received (fixes bug 34312) |
| */ |
| public void readAck(final int msToWait, final long msInterval, final DirectReplyProcessor processor) throws SocketTimeoutException, |
| ConnectionException { |
| if (isSocketClosed()) { |
| throw new ConnectionException(LocalizedStrings.Connection_CONNECTION_IS_CLOSED.toLocalizedString()); |
| } |
| synchronized (this.stateLock) { |
| this.connectionState = STATE_READING_ACK; |
| } |
| |
| |
| boolean origSocketInUse = this.socketInUse; |
| this.socketInUse = true; |
| MsgReader msgReader = null; |
| DMStats stats = owner.getConduit().stats; |
| final Version version = getRemoteVersion(); |
| try { |
| if(useNIO()) { |
| msgReader = new NIOMsgReader(this, version); |
| } else { |
| msgReader = new OioMsgReader(this, version); |
| } |
| |
| Header header = msgReader.readHeader(); |
| |
| ReplyMessage msg; |
| int len; |
| if(header.getNioMessageType() == NORMAL_MSG_TYPE) { |
| msg = (ReplyMessage) msgReader.readMessage(header); |
| len = header.getNioMessageLength(); |
| } else { |
| //TODO - really no need to go to shared map here, we could probably just cache an idle one. |
| MsgDestreamer destreamer = obtainMsgDestreamer( |
| header.getNioMessageId(), version); |
| while (header.getNioMessageType() == CHUNKED_MSG_TYPE) { |
| msgReader.readChunk(header, destreamer); |
| header = msgReader.readHeader(); |
| } |
| msgReader.readChunk(header, destreamer); |
| msg = (ReplyMessage) destreamer.getMessage(); |
| releaseMsgDestreamer(header.getNioMessageId(), destreamer); |
| len = destreamer.size(); |
| } |
| //I'd really just like to call dispatchMessage here. However, |
| //that call goes through a bunch of checks that knock about |
| //10% of the performance. Since this direct-ack stuff is all |
| //about performance, we'll skip those checks. Skipping them |
| //should be legit, because we just sent a message so we know |
| //the member is already in our view, etc. |
| DistributionManager dm = (DistributionManager) owner.getDM(); |
| msg.setBytesRead(len); |
| msg.setSender(remoteAddr); |
| stats.incReceivedMessages(1L); |
| stats.incReceivedBytes(msg.getBytesRead()); |
| stats.incMessageChannelTime(msg.resetTimestamp()); |
| msg.process(dm, processor); |
| // dispatchMessage(msg, len, false); |
| } |
| catch (MemberShunnedException e) { |
| //do nothing |
| } |
| catch (SocketTimeoutException timeout) { |
| throw timeout; |
| } |
| catch (IOException e) { |
| final String err = LocalizedStrings.Connection_ACK_READ_IO_EXCEPTION_FOR_0.toLocalizedString(this); |
| if (! isSocketClosed() ) { |
| if (logger.isDebugEnabled() && !isIgnorableIOException(e)) { |
| logger.debug(err, e); |
| } |
| } |
| try { requestClose(err + ": " + e); } catch (Exception ex) {} |
| throw new ConnectionException(LocalizedStrings.Connection_UNABLE_TO_READ_DIRECT_ACK_BECAUSE_0.toLocalizedString(e)); |
| } |
| catch(ConnectionException e) { |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e); |
| throw e; |
| } |
| catch (Exception e) { |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e); |
| if (! isSocketClosed() ) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_ACK_READ_EXCEPTION), e); |
| } |
| try { requestClose(LocalizedStrings.Connection_ACK_READ_EXCEPTION_0.toLocalizedString(e)); } catch (Exception ex) {} |
| throw new ConnectionException(LocalizedStrings.Connection_UNABLE_TO_READ_DIRECT_ACK_BECAUSE_0.toLocalizedString(e)); |
| } |
| finally { |
| stats.incProcessedMessages(1L); |
| accessed(); |
| this.socketInUse = origSocketInUse; |
| if (this.ackTimedOut) { |
| logger.info(LocalizedMessage.create( |
| LocalizedStrings.Connection_FINISHED_WAITING_FOR_REPLY_FROM_0, |
| new Object[] {getRemoteAddress()})); |
| this.ackTimedOut = false; |
| } |
| if(msgReader != null) { |
| msgReader.close(); |
| } |
| } |
| synchronized (stateLock) { |
| this.connectionState = STATE_RECEIVED_ACK; |
| } |
| } |
| |
| |
| /** processes the current NIO buffer. If there are complete messages |
| in the buffer, they are deserialized and passed to TCPConduit for |
| further processing */ |
| private void processNIOBuffer() throws ConnectionException, IOException { |
| if (nioInputBuffer != null) { |
| nioInputBuffer.flip(); |
| } |
| boolean done = false; |
| |
| while (!done && connected) { |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); |
| // long startTime = DistributionStats.getStatTime(); |
| int remaining = nioInputBuffer.remaining(); |
| if (nioLengthSet || remaining >= MSG_HEADER_BYTES) { |
| if (!nioLengthSet) { |
| int headerStartPos = nioInputBuffer.position(); |
| nioMessageLength = nioInputBuffer.getInt(); |
| /* nioMessageVersion = */ calcHdrVersion(nioMessageLength); |
| nioMessageLength = calcMsgByteSize(nioMessageLength); |
| nioMessageType = nioInputBuffer.get(); |
| nioMsgId = nioInputBuffer.getShort(); |
| directAck = (nioMessageType & DIRECT_ACK_BIT) != 0; |
| if (directAck) { |
| nioMessageType &= ~DIRECT_ACK_BIT; // clear the ack bit |
| } |
| // Following validation fixes bug 31145 |
| if (!validMsgType(nioMessageType)) { |
| Integer nioMessageTypeInteger = Integer.valueOf(nioMessageType); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNKNOWN_P2P_MESSAGE_TYPE_0, nioMessageTypeInteger)); |
| this.readerShuttingDown = true; |
| requestClose(LocalizedStrings.Connection_UNKNOWN_P2P_MESSAGE_TYPE_0.toLocalizedString(nioMessageTypeInteger)); |
| break; |
| } |
| nioLengthSet = true; |
| // keep the header "in" the buffer until we have read the entire msg. |
| // Trust me: this will reduce copying on large messages. |
| nioInputBuffer.position(headerStartPos); |
| } |
| if (remaining >= nioMessageLength+MSG_HEADER_BYTES) { |
| nioLengthSet = false; |
| nioInputBuffer.position(nioInputBuffer.position()+MSG_HEADER_BYTES); |
| // don't trust the message deserialization to leave the position in |
| // the correct spot. Some of the serialization uses buffered |
| // streams that can leave the position at the wrong spot |
| int startPos = nioInputBuffer.position(); |
| int oldLimit = nioInputBuffer.limit(); |
| nioInputBuffer.limit(startPos+nioMessageLength); |
| if (this.handshakeRead) { |
| if (nioMessageType == NORMAL_MSG_TYPE) { |
| this.owner.getConduit().stats.incMessagesBeingReceived(true, nioMessageLength); |
| ByteBufferInputStream bbis = remoteVersion == null |
| ? new ByteBufferInputStream(nioInputBuffer) |
| : new VersionedByteBufferInputStream(nioInputBuffer, |
| remoteVersion); |
| DistributionMessage msg = null; |
| try { |
| ReplyProcessor21.initMessageRPId(); |
| // add serialization stats |
| long startSer = this.owner.getConduit().stats.startMsgDeserialization(); |
| msg = (DistributionMessage)InternalDataSerializer.readDSFID(bbis); |
| this.owner.getConduit().stats.endMsgDeserialization(startSer); |
| if (bbis.available() != 0) { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.Connection_MESSAGE_DESERIALIZATION_OF_0_DID_NOT_READ_1_BYTES, |
| new Object[] { msg, Integer.valueOf(bbis.available())})); |
| } |
| try { |
| if (!dispatchMessage(msg, nioMessageLength, directAck)) { |
| directAck = false; |
| } |
| } |
| catch (MemberShunnedException e) { |
| directAck = false; // don't respond (bug39117) |
| } |
| catch (Exception de) { |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_ERROR_DISPATCHING_MESSAGE), de); |
| } |
| catch (ThreadDeath td) { |
| throw td; |
| } |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_THROWABLE_DISPATCHING_MESSAGE), t); |
| } |
| } |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| sendFailureReply(ReplyProcessor21.getMessageRPId(), LocalizedStrings.Connection_ERROR_DESERIALIZING_MESSAGE.toLocalizedString(), t, directAck); |
| if (t instanceof ThreadDeath) { |
| throw (ThreadDeath)t; |
| } |
| if(t instanceof CancelException) { |
| if (!(t instanceof CacheClosedException)) { |
| // Just log a message if we had trouble deserializing due to CacheClosedException; see bug 43543 |
| throw (CancelException) t; |
| } |
| } |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_ERROR_DESERIALIZING_MESSAGE), t); |
| } |
| finally { |
| ReplyProcessor21.clearMessageRPId(); |
| } |
| } |
| else if (nioMessageType == CHUNKED_MSG_TYPE) { |
| MsgDestreamer md = obtainMsgDestreamer(nioMsgId, remoteVersion); |
| this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, nioMessageLength); |
| try { |
| md.addChunk(nioInputBuffer, nioMessageLength); |
| } |
| catch (IOException ex) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_FAILED_HANDLING_CHUNK_MESSAGE), ex); |
| } |
| } |
| else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ { |
| //logger.info("END_CHUNK msgId="+nioMsgId); |
| MsgDestreamer md = obtainMsgDestreamer(nioMsgId, remoteVersion); |
| this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, nioMessageLength); |
| try { |
| md.addChunk(nioInputBuffer, nioMessageLength); |
| } |
| catch (IOException ex) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_FAILED_HANDLING_END_CHUNK_MESSAGE), ex); |
| } |
| DistributionMessage msg = null; |
| int msgLength = 0; |
| String failureMsg = null; |
| Throwable failureEx = null; |
| int rpId = 0; |
| boolean interrupted = false; |
| try { |
| msg = md.getMessage(); |
| } |
| catch (ClassNotFoundException ex) { |
| this.owner.getConduit().stats.decMessagesBeingReceived(md.size()); |
| failureMsg = LocalizedStrings.Connection_CLASSNOTFOUND_DESERIALIZING_MESSAGE.toLocalizedString(); |
| failureEx = ex; |
| rpId = md.getRPid(); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_CLASSNOTFOUND_DESERIALIZING_MESSAGE_0, ex)); |
| } |
| catch (IOException ex) { |
| this.owner.getConduit().stats.decMessagesBeingReceived(md.size()); |
| failureMsg = LocalizedStrings.Connection_IOEXCEPTION_DESERIALIZING_MESSAGE.toLocalizedString(); |
| failureEx = ex; |
| rpId = md.getRPid(); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_IOEXCEPTION_DESERIALIZING_MESSAGE), failureEx); |
| } |
| catch (InterruptedException ex) { |
| interrupted = true; |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex); |
| } |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable ex) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex); |
| this.owner.getConduit().stats.decMessagesBeingReceived(md.size()); |
| failureMsg = LocalizedStrings.Connection_UNEXPECTED_FAILURE_DESERIALIZING_MESSAGE.toLocalizedString(); |
| failureEx = ex; |
| rpId = md.getRPid(); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNEXPECTED_FAILURE_DESERIALIZING_MESSAGE), failureEx); |
| } |
| finally { |
| msgLength = md.size(); |
| releaseMsgDestreamer(nioMsgId, md); |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| if (msg != null) { |
| try { |
| if (!dispatchMessage(msg, msgLength, directAck)) { |
| directAck = false; |
| } |
| } |
| catch (MemberShunnedException e) { |
| // not a member anymore - don't reply |
| directAck = false; |
| } |
| catch (Exception de) { |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_ERROR_DISPATCHING_MESSAGE), de); |
| } |
| catch (ThreadDeath td) { |
| throw td; |
| } |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_THROWABLE_DISPATCHING_MESSAGE), t); |
| } |
| } |
| else if (failureEx != null) { |
| sendFailureReply(rpId, failureMsg, failureEx, directAck); |
| } |
| } |
| } |
| else { |
| // read HANDSHAKE |
| ByteBufferInputStream bbis = |
| new ByteBufferInputStream(nioInputBuffer); |
| DataInputStream dis = new DataInputStream(bbis); |
| if (!this.isReceiver) { |
| try { |
| this.replyCode = dis.readUnsignedByte(); |
| if (this.replyCode == REPLY_CODE_OK_WITH_ASYNC_INFO) { |
| this.asyncDistributionTimeout = dis.readInt(); |
| this.asyncQueueTimeout = dis.readInt(); |
| this.asyncMaxQueueSize = (long)dis.readInt() * (1024*1024); |
| if (this.asyncDistributionTimeout != 0) { |
| logger.info(LocalizedMessage.create( |
| LocalizedStrings.Connection_0_ASYNC_CONFIGURATION_RECEIVED_1, |
| new Object[] {p2pReaderName(), |
| " asyncDistributionTimeout=" |
| + this.asyncDistributionTimeout |
| + " asyncQueueTimeout=" + this.asyncQueueTimeout |
| + " asyncMaxQueueSize=" |
| + (this.asyncMaxQueueSize / (1024*1024))})); |
| } |
| // read the product version ordinal for on-the-fly serialization |
| // transformations (for rolling upgrades) |
| this.remoteVersion = Version.readVersion(dis, true); |
| } |
| } |
| catch (Exception e) { |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_ERROR_DESERIALIZING_P2P_HANDSHAKE_REPLY), e); |
| this.readerShuttingDown = true; |
| requestClose(LocalizedStrings.Connection_ERROR_DESERIALIZING_P2P_HANDSHAKE_REPLY.toLocalizedString()); |
| return; |
| } |
| catch (ThreadDeath td) { |
| throw td; |
| } |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_THROWABLE_DESERIALIZING_P2P_HANDSHAKE_REPLY), t); |
| this.readerShuttingDown = true; |
| requestClose(LocalizedStrings.Connection_THROWABLE_DESERIALIZING_P2P_HANDSHAKE_REPLY.toLocalizedString()); |
| return; |
| } |
| if (this.replyCode != REPLY_CODE_OK && this.replyCode != REPLY_CODE_OK_WITH_ASYNC_INFO) { |
| StringId err = LocalizedStrings.Connection_UNKNOWN_HANDSHAKE_REPLY_CODE_0_NIOMESSAGELENGTH_1_PROCESSORTYPE_2; |
| Object[] errArgs = new Object[] {Integer.valueOf(this.replyCode), Integer.valueOf(nioMessageLength)}; |
| if (replyCode == 0 && logger.isDebugEnabled()) { // bug 37113 |
| logger.debug(err.toLocalizedString(errArgs) + " (peer probably departed ungracefully)"); |
| } |
| else { |
| logger.fatal(LocalizedMessage.create(err, errArgs)); |
| } |
| this.readerShuttingDown = true; |
| requestClose(err.toLocalizedString(errArgs)); |
| return; |
| } |
| notifyHandshakeWaiter(true); |
| } |
| else { |
| try { |
| byte b = dis.readByte(); |
| if (b != 0) { |
| throw new IllegalStateException(LocalizedStrings.Connection_DETECTED_OLD_VERSION_PRE_501_OF_GEMFIRE_OR_NONGEMFIRE_DURING_HANDSHAKE_DUE_TO_INITIAL_BYTE_BEING_0.toLocalizedString(new Byte(b))); |
| } |
| byte handShakeByte = dis.readByte(); |
| if (handShakeByte != HANDSHAKE_VERSION) { |
| throw new IllegalStateException(LocalizedStrings.Connection_DETECTED_WRONG_VERSION_OF_GEMFIRE_PRODUCT_DURING_HANDSHAKE_EXPECTED_0_BUT_FOUND_1.toLocalizedString(new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(handShakeByte)})); |
| } |
| InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis); |
| Stub stub = new Stub(remote.getIpAddress()/*fix for bug 33615*/, remote.getDirectChannelPort(), remote.getVmViewId()); |
| setRemoteAddr(remote, stub); |
| this.sharedResource = dis.readBoolean(); |
| this.preserveOrder = dis.readBoolean(); |
| this.uniqueId = dis.readLong(); |
| // read the product version ordinal for on-the-fly serialization |
| // transformations (for rolling upgrades) |
| this.remoteVersion = Version.readVersion(dis, true); |
| int dominoNumber = 0; |
| if (this.remoteVersion == null || |
| (this.remoteVersion.compareTo(Version.GFE_80) >= 0)) { |
| dominoNumber = dis.readInt(); |
| if (this.sharedResource) { |
| dominoNumber = 0; |
| } |
| dominoCount.set(dominoNumber); |
| // this.senderName = dis.readUTF(); |
| } |
| if (!this.sharedResource) { |
| if (tipDomino()) { |
| logger.info(LocalizedMessage.create( |
| LocalizedStrings.Connection_THREAD_OWNED_RECEIVER_FORCING_ITSELF_TO_SEND_ON_THREAD_OWNED_SOCKETS)); |
| // bug #49565 - if domino count is >= 2 use shared resources. |
| // Also see DistributedCacheOperation#supportsDirectAck |
| } else { //if (dominoNumber < 2) { |
| ConnectionTable.threadWantsOwnResources(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("thread-owned receiver with domino count of {} will prefer sending on thread-owned sockets", dominoNumber); |
| } |
| // } else { |
| // ConnectionTable.threadWantsSharedResources(); |
| } |
| this.owner.owner.stats.incThreadOwnedReceivers(1L, dominoNumber); |
| //Because this thread is not shared resource, it will be used for direct |
| //ack. Direct ack messages can be large. This call will resize the send |
| //buffer. |
| setSendBufferSize(this.socket); |
| } |
| // String name = owner.getDM().getConfig().getName(); |
| // if (name == null) { |
| // name = "pid="+OSProcess.getId(); |
| // } |
| Thread.currentThread().setName( |
| // (!this.sharedResource && this.senderName != null? ("<"+this.senderName+"> -> ") : "") + |
| // "[" + name + "] "+ |
| "P2P message reader for " + this.remoteAddr |
| + " " + (this.sharedResource?"":"un") + "shared" |
| + " " + (this.preserveOrder?"":"un") + "ordered" |
| + " uid=" + this.uniqueId |
| + (dominoNumber>0? (" dom #" + dominoNumber) : "") |
| + " port=" + this.socket.getPort()); |
| } |
| catch (Exception e) { |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e); // bug 37101 |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_ERROR_DESERIALIZING_P2P_HANDSHAKE_MESSAGE), e); |
| this.readerShuttingDown = true; |
| requestClose(LocalizedStrings.Connection_ERROR_DESERIALIZING_P2P_HANDSHAKE_MESSAGE.toLocalizedString()); |
| return; |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("P2P handshake remoteId is {}{}", this.remoteId, |
| (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : "")); |
| } |
| try { |
| String authInit = System |
| .getProperty(DistributionConfigImpl.SECURITY_SYSTEM_PREFIX |
| + DistributionConfig.SECURITY_PEER_AUTH_INIT_NAME); |
| boolean isSecure = authInit!= null && authInit.length() != 0 ; |
| |
| if (isSecure) { |
| if (owner.getConduit() |
| .waitForMembershipCheck(this.remoteAddr)) { |
| sendOKHandshakeReply(); // fix for bug 33224 |
| notifyHandshakeWaiter(true); |
| } |
| else { |
| // ARB: check if we need notifyHandshakeWaiter() call. |
| notifyHandshakeWaiter(false); |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.Connection_0_TIMED_OUT_DURING_A_MEMBERSHIP_CHECK, p2pReaderName())); |
| return; |
| } |
| } |
| else { |
| sendOKHandshakeReply(); // fix for bug 33224 |
| try { |
| notifyHandshakeWaiter(true); |
| } |
| catch (Exception e) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNCAUGHT_EXCEPTION_FROM_LISTENER), e); |
| } |
| } |
| } |
| catch (IOException ex) { |
| final String err = LocalizedStrings.Connection_FAILED_SENDING_HANDSHAKE_REPLY.toLocalizedString(); |
| if (logger.isDebugEnabled()) { |
| logger.debug(err, ex); |
| } |
| this.readerShuttingDown = true; |
| requestClose(err + ": " + ex); |
| return; |
| } |
| } |
| } |
| if (!connected) { |
| continue; |
| } |
| accessed(); |
| nioInputBuffer.limit(oldLimit); |
| nioInputBuffer.position(startPos + nioMessageLength); |
| } |
| else { |
| done = true; |
| compactOrResizeBuffer(nioMessageLength); |
| } |
| } |
| else { |
| done = true; |
| if (nioInputBuffer.position() != 0) { |
| nioInputBuffer.compact(); |
| } |
| else { |
| nioInputBuffer.position(nioInputBuffer.limit()); |
| nioInputBuffer.limit(nioInputBuffer.capacity()); |
| } |
| } |
| } |
| } |
| private void compactOrResizeBuffer(int messageLength) { |
| final int oldBufferSize = nioInputBuffer.capacity(); |
| final DMStats stats = this.owner.getConduit().stats; |
| int allocSize = messageLength+MSG_HEADER_BYTES; |
| if (oldBufferSize < allocSize) { |
| // need a bigger buffer |
| logger.info(LocalizedMessage.create( |
| LocalizedStrings.Connection_ALLOCATING_LARGER_NETWORK_READ_BUFFER_NEW_SIZE_IS_0_OLD_SIZE_WAS_1, |
| new Object[] { Integer.valueOf(allocSize), Integer.valueOf(oldBufferSize)})); |
| ByteBuffer oldBuffer = nioInputBuffer; |
| nioInputBuffer = Buffers.acquireReceiveBuffer(allocSize, stats); |
| |
| if(oldBuffer != null) { |
| int oldByteCount = oldBuffer.remaining(); // needed to workaround JRockit 1.4.2.04 bug |
| nioInputBuffer.put(oldBuffer); |
| nioInputBuffer.position(oldByteCount); // workaround JRockit 1.4.2.04 bug |
| Buffers.releaseReceiveBuffer(oldBuffer, stats); |
| } |
| } |
| else { |
| if (nioInputBuffer.position() != 0) { |
| nioInputBuffer.compact(); |
| } |
| else { |
| nioInputBuffer.position(nioInputBuffer.limit()); |
| nioInputBuffer.limit(nioInputBuffer.capacity()); |
| } |
| } |
| } |
| private boolean dispatchMessage(DistributionMessage msg, int bytesRead, boolean directAck) { |
| try { |
| msg.setDoDecMessagesBeingReceived(true); |
| if(directAck) { |
| Assert.assertTrue(!isSharedResource(), "We were asked to send a direct reply on a shared socket"); |
| //TODO dirack we should resize the send buffer if we know this socket is used for direct ack. |
| msg.setReplySender(new DirectReplySender(this)); |
| } |
| this.owner.getConduit().messageReceived(this, msg, bytesRead); |
| return true; |
| } finally { |
| if (msg.containsRegionContentChange()) { |
| messagesReceived++; |
| } |
| } |
| } |
| |
| protected Socket getSocket() throws SocketException { |
| // fix for bug 37286 |
| Socket result = this.socket; |
| if (result == null) { |
| throw new SocketException(LocalizedStrings.Connection_SOCKET_HAS_BEEN_CLOSED.toLocalizedString()); |
| } |
| return result; |
| } |
| public boolean isSocketClosed() { |
| return this.socket.isClosed() || !this.socket.isConnected(); |
| } |
| private boolean isSocketInUse() { |
| return this.socketInUse; |
| } |
| |
| |
| protected final void accessed() { |
| this.accessed = true; |
| } |
| |
| /** returns the ConnectionKey stub representing the other side of |
| this connection (host:port) */ |
| public final Stub getRemoteId() { |
| return remoteId; |
| } |
| |
| /** return the DM id of the guy on the other side of this connection. |
| */ |
| public final InternalDistributedMember getRemoteAddress() { |
| return this.remoteAddr; |
| } |
| |
| /** |
| * Return the version of the guy on the other side of this connection. |
| */ |
| public final Version getRemoteVersion() { |
| return this.remoteVersion; |
| } |
| |
| @Override |
| public String toString() { |
| return String.valueOf(remoteAddr) + '@' + this.uniqueId |
| + (this.remoteVersion != null ? ('(' + this.remoteVersion.toString() |
| + ')') : "") /*DEBUG + " accepted=" + this.isReceiver + " connected=" + this.connected + " hash=" + System.identityHashCode(this) + " preserveOrder=" + this.preserveOrder + |
| " closing=" + isClosing() + ">"*/; |
| } |
| |
| /** |
| * answers whether this connection was initiated in this vm |
| * @return true if the connection was initiated here |
| * @since 5.1 |
| */ |
| protected boolean getOriginatedHere() { |
| return !this.isReceiver; |
| } |
| |
| /** |
| * answers whether this connection is used for ordered message delivery |
| */ |
| protected boolean getPreserveOrder() { |
| return preserveOrder; |
| } |
| |
| /** |
| * answers the unique ID of this connection in the originating VM |
| */ |
| protected long getUniqueId() { |
| return this.uniqueId; |
| } |
| |
| /** |
| * answers the number of messages received by this connection |
| */ |
| protected long getMessagesReceived() { |
| return messagesReceived; |
| } |
| |
| /** |
| * answers the number of messages sent on this connection |
| */ |
| protected long getMessagesSent() { |
| return messagesSent; |
| } |
| public void acquireSendPermission() throws ConnectionException { |
| if (!this.connected) { |
| throw new ConnectionException(LocalizedStrings.Connection_CONNECTION_IS_CLOSED.toLocalizedString()); |
| } |
| if (isReaderThread()) { |
| // reader threads send replies and we always want to permit those without waiting |
| return; |
| } |
| // @todo darrel: add some stats |
| boolean interrupted = false; |
| try { |
| for (;;) { |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); |
| try { |
| this.senderSem.acquire(); |
| break; |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| } |
| } // for |
| } |
| finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| if (!this.connected) { |
| this.senderSem.release(); |
| this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); // bug 37101 |
| throw new ConnectionException(LocalizedStrings.Connection_CONNECTION_IS_CLOSED.toLocalizedString()); |
| } |
| } |
| public void releaseSendPermission() { |
| if (isReaderThread()) { |
| return; |
| } |
| this.senderSem.release(); |
| } |
| private void closeSenderSem() { |
| // All we need to do is increase the number of permits by one |
| // just in case 1 or more guys are currently waiting to acquire. |
| // One of them will get it and then find out the connection is closed |
| // and then he will release it until all guys currently waiting to acquire |
| // will complete by throwing a ConnectionException. |
| releaseSendPermission(); |
| } |
| |
| boolean nioChecked; |
| boolean useNIO; |
| |
| private final boolean useNIO() { |
| if (TCPConduit.useSSL) { |
| return false; |
| } |
| if (this.nioChecked) { |
| return this.useNIO; |
| } |
| this.nioChecked = true; |
| this.useNIO = this.owner.getConduit().useNIO(); |
| if (!this.useNIO) { |
| return false; |
| } |
| // JDK bug 6230761 - NIO can't be used with IPv6 on Windows |
| if (this.socket != null && (this.socket.getInetAddress() instanceof Inet6Address)) { |
| String os = System.getProperty("os.name"); |
| if (os != null) { |
| if (os.indexOf("Windows") != -1) { |
| this.useNIO = false; |
| } |
| } |
| } |
| return this.useNIO; |
| } |
| } |