| /* |
| * Copyright 1999,2005 The Apache Software Foundation. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| * use this file except in compliance with the License. You may obtain a copy of |
| * the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.catalina.cluster.tcp; |
| |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.net.InetAddress; |
| import java.net.Socket; |
| import java.net.SocketException; |
| |
| import org.apache.catalina.cluster.ClusterMessage; |
| import org.apache.catalina.cluster.io.XByteBuffer; |
| import org.apache.catalina.util.StringManager; |
| |
| /** |
| * Send cluster messages with only one socket. Ack and keep Alive Handling is |
| * supported |
| * |
| * @author Peter Rossbach |
| * @author Filip Hanik |
| * @version $Revision$ $Date$ |
| * @since 5.5.7 |
| */ |
| public class DataSender implements IDataSender { |
| |
| private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory |
| .getLog(DataSender.class); |
| |
| /** |
| * The string manager for this package. |
| */ |
| protected static StringManager sm = StringManager |
| .getManager(Constants.Package); |
| |
| // ----------------------------------------------------- Instance Variables |
| |
| /** |
| * The descriptive information about this implementation. |
| */ |
| private static final String info = "DataSender/2.1"; |
| |
| /** |
| * receiver address |
| */ |
| private InetAddress address; |
| |
| /** |
| * receiver port |
| */ |
| private int port; |
| |
| |
| /** |
| * cluster domain |
| */ |
| private String domain; |
| |
| /** |
| * current sender socket |
| */ |
| private Socket socket = null; |
| |
| /** |
| * is Socket really connected |
| */ |
| private boolean isSocketConnected = false; |
| |
| /** |
| * Message transfer over socket ? |
| */ |
| private boolean isMessageTransferStarted = false; |
| |
| /** |
| * sender is in suspect state (last transfer failed) |
| */ |
| private SenderState senderState = new SenderState(); |
| |
| /** |
| * wait time for ack |
| */ |
| private long ackTimeout; |
| |
| /** |
| * number of requests |
| */ |
| protected long nrOfRequests = 0; |
| |
| /** |
| * total bytes to transfer |
| */ |
| protected long totalBytes = 0; |
| |
| /** |
| * number of connects |
| */ |
| protected long connectCounter = 0; |
| |
| /** |
| * number of explizit disconnects |
| */ |
| protected long disconnectCounter = 0; |
| |
| /** |
| * number of failing acks |
| */ |
| protected long missingAckCounter = 0; |
| |
| /** |
| * number of data resends (second trys after socket failure) |
| */ |
| protected long dataResendCounter = 0; |
| |
| /** |
| * number of data failure sends |
| */ |
| protected long dataFailureCounter = 0; |
| |
| /** |
| * doProcessingStats |
| */ |
| protected boolean doProcessingStats = false; |
| |
| /** |
| * proessingTime |
| */ |
| protected long processingTime = 0; |
| |
| /** |
| * min proessingTime |
| */ |
| protected long minProcessingTime = Long.MAX_VALUE ; |
| |
| /** |
| * max proessingTime |
| */ |
| protected long maxProcessingTime = 0; |
| |
| /** |
| * doWaitAckStats |
| */ |
| protected boolean doWaitAckStats = false; |
| |
| /** |
| * waitAckTime |
| */ |
| protected long waitAckTime = 0; |
| |
| /** |
| * min waitAckTime |
| */ |
| protected long minWaitAckTime = Long.MAX_VALUE ; |
| |
| /** |
| * max waitAckTime |
| */ |
| protected long maxWaitAckTime = 0; |
| |
| /** |
| * keep socket open for no more than one min |
| */ |
| private long keepAliveTimeout = 60 * 1000; |
| |
| /** |
| * max requests before reconnecting (default -1 unlimited) |
| */ |
| private int keepAliveMaxRequestCount = -1; |
| |
| /** |
| * Last connect timestamp |
| */ |
| protected long keepAliveConnectTime = 0; |
| |
| /** |
| * keepalive counter |
| */ |
| protected int keepAliveCount = 0; |
| |
| /** |
| * wait for receiver Ack |
| */ |
| private boolean waitForAck = false; |
| |
| /** |
| * number of socket close |
| */ |
| private int socketCloseCounter = 0 ; |
| |
| /** |
| * number of socket open |
| */ |
| private int socketOpenCounter = 0 ; |
| |
| /** |
| * number of socket open failures |
| */ |
| private int socketOpenFailureCounter = 0 ; |
| |
| /** |
| * After failure make a resend |
| */ |
| private boolean resend = false ; |
| |
| // ------------------------------------------------------------- Constructor |
| |
| public DataSender(String domain,InetAddress host, int port) { |
| this.address = host; |
| this.port = port; |
| this.domain = domain; |
| if (log.isDebugEnabled()) |
| log.debug(sm.getString("IDataSender.create",address, new Integer(port))); |
| } |
| |
| public DataSender(String domain,InetAddress host, int port, SenderState state) { |
| this(domain,host,port); |
| if ( state != null ) this.senderState = state; |
| } |
| // ------------------------------------------------------------- Properties |
| |
| /** |
| * Return descriptive information about this implementation and the |
| * corresponding version number, in the format |
| * <code><description>/<version></code>. |
| */ |
| public String getInfo() { |
| |
| return (info); |
| |
| } |
| |
| /** |
| * @return Returns the nrOfRequests. |
| */ |
| public long getNrOfRequests() { |
| return nrOfRequests; |
| } |
| |
| /** |
| * @return Returns the totalBytes. |
| */ |
| public long getTotalBytes() { |
| return totalBytes; |
| } |
| |
| /** |
| * @return Returns the avg totalBytes/nrOfRequests. |
| */ |
| public long getAvgMessageSize() { |
| if ( nrOfRequests > 0 ) { |
| return totalBytes / nrOfRequests; |
| } else { |
| return 0; |
| } |
| } |
| |
| /** |
| * @return Returns the avg processingTime/nrOfRequests. |
| */ |
| public double getAvgProcessingTime() { |
| if ( nrOfRequests > 0 ) { |
| return ((double)processingTime) / nrOfRequests; |
| } else { |
| return 0; |
| } |
| } |
| |
| /** |
| * @return Returns the maxProcessingTime. |
| */ |
| public long getMaxProcessingTime() { |
| return maxProcessingTime; |
| } |
| |
| /** |
| * @return Returns the minProcessingTime. |
| */ |
| public long getMinProcessingTime() { |
| return minProcessingTime; |
| } |
| |
| /** |
| * @return Returns the processingTime. |
| */ |
| public long getProcessingTime() { |
| return processingTime; |
| } |
| |
| /** |
| * @return Returns the doProcessingStats. |
| */ |
| public boolean isDoProcessingStats() { |
| return doProcessingStats; |
| } |
| |
| /** |
| * @param doProcessingStats The doProcessingStats to set. |
| */ |
| public void setDoProcessingStats(boolean doProcessingStats) { |
| this.doProcessingStats = doProcessingStats; |
| } |
| |
| |
| /** |
| * @return Returns the doWaitAckStats. |
| */ |
| public boolean isDoWaitAckStats() { |
| return doWaitAckStats; |
| } |
| |
| /** |
| * @param doWaitAckStats The doWaitAckStats to set. |
| */ |
| public void setDoWaitAckStats(boolean doWaitAckStats) { |
| this.doWaitAckStats = doWaitAckStats; |
| } |
| |
| /** |
| * @return Returns the avg waitAckTime/nrOfRequests. |
| */ |
| public double getAvgWaitAckTime() { |
| if ( nrOfRequests > 0 ) { |
| return ((double)waitAckTime) / nrOfRequests; |
| } else { |
| return 0; |
| } |
| } |
| |
| /** |
| * @return Returns the maxWaitAckTime. |
| */ |
| public long getMaxWaitAckTime() { |
| return maxWaitAckTime; |
| } |
| |
| /** |
| * @return Returns the minWaitAckTime. |
| */ |
| public long getMinWaitAckTime() { |
| return minWaitAckTime; |
| } |
| |
| /** |
| * @return Returns the waitAckTime. |
| */ |
| public long getWaitAckTime() { |
| return waitAckTime; |
| } |
| |
| /** |
| * @return Returns the connectCounter. |
| */ |
| public long getConnectCounter() { |
| return connectCounter; |
| } |
| |
| /** |
| * @return Returns the disconnectCounter. |
| */ |
| public long getDisconnectCounter() { |
| return disconnectCounter; |
| } |
| |
| /** |
| * @return Returns the missingAckCounter. |
| */ |
| public long getMissingAckCounter() { |
| return missingAckCounter; |
| } |
| |
| /** |
| * @return Returns the socketOpenCounter. |
| */ |
| public int getSocketOpenCounter() { |
| return socketOpenCounter; |
| } |
| |
| /** |
| * @return Returns the socketOpenFailureCounter. |
| */ |
| public int getSocketOpenFailureCounter() { |
| return socketOpenFailureCounter; |
| } |
| |
| /** |
| * @return Returns the socketCloseCounter. |
| */ |
| public int getSocketCloseCounter() { |
| return socketCloseCounter; |
| } |
| |
| /** |
| * @return Returns the dataResendCounter. |
| */ |
| public long getDataResendCounter() { |
| return dataResendCounter; |
| } |
| |
| /** |
| * @return Returns the dataFailureCounter. |
| */ |
| public long getDataFailureCounter() { |
| return dataFailureCounter; |
| } |
| |
| /** |
| * @param address The address to set. |
| */ |
| public void setAddress(InetAddress address) { |
| this.address = address; |
| } |
| |
| public InetAddress getAddress() { |
| return address; |
| } |
| |
| |
| /** |
| * @param port The port to set. |
| */ |
| public void setPort(int port) { |
| this.port = port; |
| } |
| |
| public int getPort() { |
| return port; |
| } |
| |
| /** |
| * @return Returns the domain. |
| */ |
| public String getDomain() { |
| return domain; |
| } |
| |
| /** |
| * @param domain The domain to set. |
| */ |
| public void setDomain(String domain) { |
| this.domain = domain; |
| } |
| |
| public boolean isConnected() { |
| return isSocketConnected; |
| } |
| |
| /** |
| * @return Is DataSender send a message |
| */ |
| public boolean isMessageTransferStarted() { |
| return isMessageTransferStarted; |
| } |
| |
| /** |
| * @param isSocketConnected |
| * The isSocketConnected to set. |
| */ |
| protected void setSocketConnected(boolean isSocketConnected) { |
| this.isSocketConnected = isSocketConnected; |
| } |
| |
| public boolean isSuspect() { |
| return senderState.isSuspect() || senderState.isFailing(); |
| } |
| |
| public boolean getSuspect() { |
| return isSuspect(); |
| } |
| |
| public void setSuspect(boolean suspect) { |
| if ( suspect ) |
| this.senderState.setSuspect(); |
| else |
| this.senderState.setReady(); |
| } |
| |
| public long getAckTimeout() { |
| return ackTimeout; |
| } |
| |
| public void setAckTimeout(long ackTimeout) { |
| this.ackTimeout = ackTimeout; |
| } |
| |
| public long getKeepAliveTimeout() { |
| return keepAliveTimeout; |
| } |
| |
| public void setKeepAliveTimeout(long keepAliveTimeout) { |
| this.keepAliveTimeout = keepAliveTimeout; |
| } |
| |
| public int getKeepAliveMaxRequestCount() { |
| return keepAliveMaxRequestCount; |
| } |
| |
| public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) { |
| this.keepAliveMaxRequestCount = keepAliveMaxRequestCount; |
| } |
| |
| /** |
| * @return Returns the keepAliveConnectTime. |
| */ |
| public long getKeepAliveConnectTime() { |
| return keepAliveConnectTime; |
| } |
| |
| /** |
| * @return Returns the keepAliveCount. |
| */ |
| public int getKeepAliveCount() { |
| return keepAliveCount; |
| } |
| |
| /** |
| * @return Returns the waitForAck. |
| */ |
| public boolean isWaitForAck() { |
| return waitForAck; |
| } |
| |
| /** |
| * @param waitForAck |
| * The waitForAck to set. |
| */ |
| public void setWaitForAck(boolean waitForAck) { |
| this.waitForAck = waitForAck; |
| } |
| |
| /** |
| * @return Returns the resend. |
| */ |
| public boolean isResend() { |
| return resend; |
| } |
| /** |
| * @param resend The resend to set. |
| */ |
| public void setResend(boolean resend) { |
| this.resend = resend; |
| } |
| /** |
| * @return Returns the socket. |
| */ |
| public Socket getSocket() { |
| return socket; |
| } |
| |
| public SenderState getSenderState() { |
| return senderState; |
| } |
| |
| /** |
| * @param socket The socket to set. |
| */ |
| public void setSocket(Socket socket) { |
| this.socket = socket; |
| } |
| // --------------------------------------------------------- Public Methods |
| |
| /** |
| * Connect other cluster member receiver |
| * @see org.apache.catalina.cluster.tcp.IDataSender#connect() |
| */ |
| public synchronized void connect() throws java.io.IOException { |
| if(!isMessageTransferStarted) { |
| openSocket(); |
| if(isConnected()) { |
| connectCounter++; |
| if (log.isDebugEnabled()) |
| log.debug(sm.getString("IDataSender.connect", address.getHostAddress(), |
| new Integer(port),new Long(connectCounter))); |
| } |
| } else |
| if (log.isWarnEnabled()) |
| log.warn(sm.getString("IDataSender.message.create", address.getHostAddress(),new Integer(port))); |
| } |
| |
| |
| /** |
| * disconnect and close socket |
| * |
| * @see IDataSender#disconnect() |
| */ |
| public synchronized void disconnect() { |
| if(!isMessageTransferStarted) { |
| boolean connect = isConnected() ; |
| closeSocket(); |
| if(connect) { |
| disconnectCounter++; |
| if (log.isDebugEnabled()) |
| log.debug(sm.getString("IDataSender.disconnect", address.getHostAddress(), |
| new Integer(port),new Long(disconnectCounter))); |
| } |
| } else |
| if (log.isWarnEnabled()) |
| log.warn(sm.getString("IDataSender.message.disconnect", address.getHostAddress(),new Integer(port))); |
| |
| } |
| |
| /** |
| * Check, if time to close socket! Important for AsyncSocketSender that |
| * replication thread is not fork again! <b>Only work when keepAliveTimeout |
| * or keepAliveMaxRequestCount greater -1 </b> |
| * FIXME Can we close a socket when a message wait for ack? |
| * @return true, is socket close |
| * @see DataSender#closeSocket() |
| */ |
| public synchronized boolean checkKeepAlive() { |
| boolean isCloseSocket = true ; |
| if(!isMessageTransferStarted) { |
| if(isConnected()) { |
| if ((keepAliveTimeout > -1 && (System.currentTimeMillis() - keepAliveConnectTime) > keepAliveTimeout) |
| || (keepAliveMaxRequestCount > -1 && keepAliveCount >= keepAliveMaxRequestCount)) { |
| closeSocket(); |
| } else |
| isCloseSocket = false ; |
| } |
| } else |
| isCloseSocket = false ; |
| |
| return isCloseSocket; |
| } |
| |
| /** |
| * Send message |
| * |
| * @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(, |
| * ClusterData) |
| */ |
| public synchronized void sendMessage(ClusterData data) |
| throws java.io.IOException { |
| pushMessage(data); |
| } |
| |
| /** |
| * Reset sender statistics |
| */ |
| public synchronized void resetStatistics() { |
| nrOfRequests = 0; |
| totalBytes = 0; |
| disconnectCounter = 0; |
| connectCounter = isConnected() ? 1 : 0; |
| missingAckCounter = 0; |
| dataResendCounter = 0; |
| dataFailureCounter = 0 ; |
| socketOpenCounter =isConnected() ? 1 : 0; |
| socketOpenFailureCounter = 0 ; |
| socketCloseCounter = 0; |
| processingTime = 0 ; |
| minProcessingTime = Long.MAX_VALUE ; |
| maxProcessingTime = 0 ; |
| waitAckTime = 0 ; |
| minWaitAckTime = Long.MAX_VALUE ; |
| maxWaitAckTime = 0 ; |
| } |
| |
| /** |
| * Name of this SockerSender |
| */ |
| public String toString() { |
| StringBuffer buf = new StringBuffer("DataSender["); |
| buf.append(getAddress()).append(":").append(getPort()).append("]"); |
| return buf.toString(); |
| } |
| |
| // --------------------------------------------------------- Protected Methods |
| |
| /** |
| * open real socket and set time out when waitForAck is enabled |
| * is socket open return directly |
| * @throws IOException |
| * @throws SocketException |
| */ |
| protected void openSocket() throws IOException, SocketException { |
| if(isConnected()) |
| return ; |
| try { |
| createSocket(); |
| if (isWaitForAck()) |
| socket.setSoTimeout((int) ackTimeout); |
| isSocketConnected = true; |
| socketOpenCounter++; |
| this.keepAliveCount = 0; |
| this.keepAliveConnectTime = System.currentTimeMillis(); |
| if (log.isDebugEnabled()) |
| log.debug(sm.getString("IDataSender.openSocket", address |
| .getHostAddress(), new Integer(port),new Long(socketOpenCounter))); |
| } catch (IOException ex1) { |
| socketOpenFailureCounter++ ; |
| if (log.isDebugEnabled()) |
| log.debug(sm.getString("IDataSender.openSocket.failure", |
| address.getHostAddress(), new Integer(port),new Long(socketOpenFailureCounter)), ex1); |
| throw ex1; |
| } |
| |
| } |
| |
| /** |
| * @throws IOException |
| * @throws SocketException |
| */ |
| protected void createSocket() throws IOException, SocketException { |
| socket = new Socket(getAddress(), getPort()); |
| } |
| |
| /** |
| * close socket |
| * |
| * @see DataSender#disconnect() |
| * @see DataSender#closeSocket() |
| */ |
| protected void closeSocket() { |
| if(isConnected()) { |
| if (socket != null) { |
| try { |
| socket.close(); |
| } catch (IOException x) { |
| } finally { |
| socket = null; |
| } |
| } |
| this.keepAliveCount = 0; |
| isSocketConnected = false; |
| socketCloseCounter++; |
| if (log.isDebugEnabled()) |
| log.debug(sm.getString("IDataSender.closeSocket", |
| address.getHostAddress(), new Integer(port),new Long(socketCloseCounter))); |
| } |
| } |
| |
| /** |
| * Add statistic for this socket instance |
| * |
| * @param length |
| */ |
| protected void addStats(int length) { |
| nrOfRequests++; |
| totalBytes += length; |
| if (log.isInfoEnabled() && (nrOfRequests % 1000) == 0) { |
| log.info(sm.getString("IDataSender.stats", new Object[] { |
| getAddress().getHostAddress(), new Integer(getPort()), |
| new Long(totalBytes), new Long(nrOfRequests), |
| new Long(totalBytes / nrOfRequests), |
| new Long(getProcessingTime()), |
| new Double(getAvgProcessingTime())})); |
| } |
| } |
| |
| /** |
| * Add processing stats times |
| * @param startTime |
| */ |
| protected void addProcessingStats(long startTime) { |
| long time = System.currentTimeMillis() - startTime ; |
| if(time < minProcessingTime) |
| minProcessingTime = time ; |
| if( time > maxProcessingTime) |
| maxProcessingTime = time ; |
| processingTime += time ; |
| } |
| |
| /** |
| * Add waitAck stats times |
| * @param startTime |
| */ |
| protected void addWaitAckStats(long startTime) { |
| long time = System.currentTimeMillis() - startTime ; |
| if(time < minWaitAckTime) |
| minWaitAckTime = time ; |
| if( time > maxWaitAckTime) |
| maxWaitAckTime = time ; |
| waitAckTime += time ; |
| } |
| /** |
| * Push messages with only one socket at a time |
| * Wait for ack is needed and make auto retry when write message is failed. |
| * After sending error close and reopen socket again. |
| * |
| * After successfull sending update stats |
| * |
| * WARNING: Subclasses must be very carefull that only one thread call this pushMessage at once!!! |
| * |
| * @see #closeSocket() |
| * @see #openSocket() |
| * @see #writeData(ClusterData) |
| * |
| * @param data |
| * data to send |
| * @throws java.io.IOException |
| * @since 5.5.10 |
| */ |
| protected void pushMessage( ClusterData data) |
| throws java.io.IOException { |
| long time = 0 ; |
| if(doProcessingStats) { |
| time = System.currentTimeMillis(); |
| } |
| boolean messageTransfered = false ; |
| synchronized(this) { |
| checkKeepAlive(); |
| if (!isConnected()) |
| openSocket(); |
| else if(keepAliveTimeout > -1) |
| this.keepAliveConnectTime = System.currentTimeMillis(); |
| } |
| IOException exception = null; |
| try { |
| writeData(data); |
| messageTransfered = true ; |
| } catch (java.io.IOException x) { |
| if(data.getResend() == ClusterMessage.FLAG_ALLOWED || |
| (data.getResend() == ClusterMessage.FLAG_DEFAULT && isResend() )) { |
| // second try with fresh connection |
| dataResendCounter++; |
| if (log.isTraceEnabled()) |
| log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(), |
| new Integer(port)),x); |
| synchronized(this) { |
| closeSocket(); |
| openSocket(); |
| } |
| try { |
| writeData(data); |
| messageTransfered = true; |
| } catch (IOException xx) { |
| exception = xx; |
| throw xx ; |
| } |
| } else { |
| synchronized(this) { |
| closeSocket(); |
| } |
| exception = x; |
| throw x ; |
| } |
| } finally { |
| this.keepAliveCount++; |
| checkKeepAlive(); |
| if(doProcessingStats) { |
| addProcessingStats(time); |
| } |
| if(messageTransfered) { |
| addStats(data.getMessage().length); |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(), |
| new Integer(port), data.getUniqueId(), new Long(data.getMessage().length))); |
| } |
| } else { |
| dataFailureCounter++; |
| throw exception; |
| } |
| } |
| } |
| |
| /** |
| * Sent real cluster Message to socket stream |
| * FIXME send compress |
| * @param data |
| * @throws IOException |
| * @since 5.5.10 |
| */ |
| protected void writeData(ClusterData data) throws IOException { |
| synchronized(this) { |
| isMessageTransferStarted = true ; |
| } |
| try { |
| byte[] message = data.getMessage(); |
| OutputStream out = socket.getOutputStream(); |
| out.write(XByteBuffer.createDataPackage(message,data.getCompress())); |
| out.flush(); |
| if (isWaitForAck()) |
| waitForAck(ackTimeout); |
| } finally { |
| synchronized(this) { |
| isMessageTransferStarted = false ; |
| } |
| } |
| } |
| |
| /** |
| * Wait for Acknowledgement from other server |
| * FIXME Please, not wait only for three charcters, better control that the wait ack message is correct. |
| * @param timeout |
| * @throws java.io.IOException |
| * @throws java.net.SocketTimeoutException |
| */ |
| protected void waitForAck(long timeout) throws java.io.IOException { |
| long time = 0 ; |
| if(doWaitAckStats) { |
| time = System.currentTimeMillis(); |
| } |
| try { |
| int bytesRead = 0; |
| if ( log.isTraceEnabled() ) |
| log.trace(sm.getString("IDataSender.ack.start",getAddress(), new Integer(socket.getLocalPort()))); |
| int i = socket.getInputStream().read(); |
| while ((i != -1) && (i != 3) && bytesRead < 10) { |
| if ( log.isTraceEnabled() ) |
| log.trace(sm.getString("IDataSender.ack.read",getAddress(), new Integer(socket.getLocalPort()),new Character((char) i))); |
| bytesRead++; |
| i = socket.getInputStream().read(); |
| } |
| if (i != 3) { |
| if (i == -1) { |
| throw new IOException(sm.getString("IDataSender.ack.eof",getAddress(), new Integer(socket.getLocalPort()))); |
| } else { |
| throw new IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new Integer(socket.getLocalPort()))); |
| } |
| } else { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("IDataSender.ack.receive", getAddress(),new Integer(socket.getLocalPort()))); |
| } |
| } |
| } catch (IOException x) { |
| missingAckCounter++; |
| String errmsg = sm.getString("IDataSender.ack.missing", getAddress(), |
| new Integer(socket.getLocalPort()), |
| new Long(this.ackTimeout)); |
| if ( !this.isSuspect() ) { |
| this.setSuspect(true); |
| if ( log.isWarnEnabled() ) log.warn(errmsg, x); |
| } else { |
| if ( log.isDebugEnabled() )log.debug(errmsg, x); |
| } |
| throw x; |
| } finally { |
| if(doWaitAckStats) { |
| addWaitAckStats(time); |
| } |
| } |
| } |
| } |