blob: c2332e061179f84db7e21b51a98c115885ad1f30 [file] [log] [blame]
/*
* Copyright 1999,2004-2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.catalina.cluster.tcp;
import java.net.InetAddress;
import org.apache.catalina.cluster.util.FastQueue;
import org.apache.catalina.cluster.util.LinkObject;
import org.apache.catalina.cluster.util.IQueue;
/**
* Send cluster messages from a Message queue with only one socket. Ack and keep
* Alive Handling is supported. Fast Queue can limit queue size and consume all messages at queue at one block.<br/>
* Limit the queue lock contention under high load!<br/>
* <ul>
* <li>With autoConnect=false at ReplicationTransmitter, you can disconnect the
* sender and all messages are queued. Only use this for small maintaince
* isuses!</li>
* <li>waitForAck=true, means that receiver ack the transfer</li>
* <li>after one minute idle time, or number of request (100) the connection is
* reconnected with next request. Change this for production use!</li>
* <li>default ackTimeout is 15 sec: this is very low for big all session
* replication messages after restart a node</li>
* <li>disable keepAlive: keepAliveTimeout="-1" and
* keepAliveMaxRequestCount="-1"</li>
* <li>maxQueueLength: Limit the sender queue length (membership goes well, but transfer is failure!!)</li>
* </ul>
* FIXME: refactor code duplications with AsyncSocketSender => configurable or extract super class
* @author Peter Rossbach ( idea comes form Rainer Jung)
* @version $Revision$ $Date$
* @since 5.5.9
*/
public class FastAsyncSocketSender extends DataSender {
private static int threadCounter = 1;
private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
.getLog(FastAsyncSocketSender.class);
/**
* The descriptive information about this implementation.
*/
private static final String info = "FastAsyncSocketSender/3.1";
// ----------------------------------------------------- Instance Variables
/**
* Message Queue
*/
private FastQueue queue = new FastQueue();
/**
* Active thread to push messages asynchronous to the other replication node
*/
private FastQueueThread queueThread = null;
/**
* recover timeout ( default 5 secs)
*/
private long recoverTimeout = 5000;
/**
* number of recover tries
*/
private int recoverCounter = 0;
/**
* Count number of queue message
*/
private long inQueueCounter = 0;
/**
* Count all successfull push messages from queue
*/
private long outQueueCounter = 0;
private int threadPriority = Thread.NORM_PRIORITY;;
// ------------------------------------------------------------- Constructor
/**
* start background thread to push incomming cluster messages to replication
* node
*
* @param domain replication cluster domain (session domain)
* @param host replication node tcp address
* @param port replication node tcp port
*/
public FastAsyncSocketSender(String domain,InetAddress host, int port) {
super(domain,host, port);
checkThread();
}
// ------------------------------------------------------------- Properties
/**
* Return descriptive information about this implementation and the
* corresponding version number, in the format
* <code>&lt;description&gt;/&lt;version&gt;</code>.
*/
public String getInfo() {
return (info);
}
/**
* get current add wait timeout
* @return current wait timeout
*/
public long getQueueAddWaitTimeout() {
return queue.getAddWaitTimeout();
}
/**
* Set add wait timeout (default 10000 msec)
* @param timeout
*/
public void setQueueAddWaitTimeout(long timeout) {
queue.setAddWaitTimeout(timeout);
}
/**
* get current remove wait timeout
* @return The timeout
*/
public long getQueueRemoveWaitTimeout() {
return queue.getRemoveWaitTimeout();
}
/**
* set remove wait timeout ( default 30000 msec)
* @param timeout
*/
public void setRemoveWaitTimeout(long timeout) {
queue.setRemoveWaitTimeout(timeout);
}
/**
* @return Returns the checkLock.
*/
public boolean isQueueCheckLock() {
return queue.isCheckLock();
}
/**
* @param checkLock The checkLock to set.
*/
public void setQueueCheckLock(boolean checkLock) {
queue.setCheckLock(checkLock);
}
/**
* @return Returns the doStats.
*/
public boolean isQueueDoStats() {
return queue.isDoStats();
}
/**
* @param doStats The doStats to set.
*/
public void setQueueDoStats(boolean doStats) {
queue.setDoStats(doStats);
}
/**
* @return Returns the timeWait.
*/
public boolean isQueueTimeWait() {
return queue.isTimeWait();
}
/**
* @param timeWait The timeWait to set.
*/
public void setQueueTimeWait(boolean timeWait) {
queue.setTimeWait(timeWait);
}
/**
* @return Returns the inQueueCounter.
*/
public int getMaxQueueLength() {
return queue.getMaxQueueLength();
}
/**
* @param length max queue length
*/
public void setMaxQueueLength(int length) {
queue.setMaxQueueLength(length);
}
/**
* @return Returns the add wait times.
*/
public long getQueueAddWaitTime() {
return queue.getAddWait();
}
/**
* @return Returns the add wait times.
*/
public long getQueueRemoveWaitTime() {
return queue.getRemoveWait();
}
/**
* @return Returns the inQueueCounter.
*/
public long getInQueueCounter() {
return inQueueCounter;
}
/**
* @return Returns the outQueueCounter.
*/
public long getOutQueueCounter() {
return outQueueCounter;
}
/**
* @return Returns the queueSize.
*/
public int getQueueSize() {
return queue.getSize();
}
/**
* get current push message recover timeout
* @return current push message recover timeout
*/
public long getRecoverTimeout() {
return recoverTimeout;
}
/**
* Set recover timeout (default 5000 msec)
* @param timeout
*/
public void setRecoverTimeout(long timeout) {
recoverTimeout = timeout;
}
/**
* get current push message recover counter
* @return current push message recover counter
*/
public int getRecoverCounter() {
return recoverCounter;
}
/**
* Set recover couner (default 5 )
* @param counter
*/
public void setRecoverCounter(int counter) {
recoverCounter = counter;
}
/**
* change active the queue Thread priority
* @param threadPriority value must be between MIN and MAX Thread Priority
* @exception IllegalArgumentException
*/
public void setThreadPriority(int threadPriority) {
if (log.isDebugEnabled())
log.debug(sm.getString("FastAsyncSocketSender.setThreadPriority",
getAddress().getHostAddress(), new Integer(getPort()),
new Integer(threadPriority)));
if (threadPriority < Thread.MIN_PRIORITY) {
throw new IllegalArgumentException(sm.getString(
"FastAsyncSocketSender.min.exception", getAddress()
.getHostAddress(), new Integer(getPort()),
new Integer(threadPriority)));
} else if (threadPriority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(sm.getString(
"FastAsyncSocketSender.max.exception", getAddress()
.getHostAddress(), new Integer(getPort()),
new Integer(threadPriority)));
}
this.threadPriority = threadPriority;
if (queueThread != null)
queueThread.setPriority(threadPriority);
}
/**
* Get the current threadPriority
* @return The thread priority
*/
public int getThreadPriority() {
return threadPriority;
}
/**
* @return Returns the queuedNrOfBytes.
*/
public long getQueuedNrOfBytes() {
if(queueThread != null)
return queueThread.getQueuedNrOfBytes();
return 0l ;
}
// --------------------------------------------------------- Public Methods
/**
* Connect to socket and start background thread to push queued messages
*
* @see org.apache.catalina.cluster.tcp.IDataSender#connect()
*/
public void connect() throws java.io.IOException {
super.connect();
checkThread();
if(!queue.isEnabled())
queue.start() ;
}
/**
* Disconnect socket ad stop queue thread
*
* @see org.apache.catalina.cluster.tcp.IDataSender#disconnect()
*/
public void disconnect() {
stopThread();
// delete "remove" lock at queue
queue.stop() ;
// enable that sendMessage can add new messages
queue.start() ;
// close socket
super.disconnect();
}
/**
* Send message to queue for later sending.
*
* @see org.apache.catalina.cluster.tcp.DataSender#pushMessage(ClusterData)
*/
public void sendMessage(ClusterData data)
throws java.io.IOException {
queue.add(data.getUniqueId(), data);
synchronized (this) {
inQueueCounter++;
if(queueThread != null)
queueThread.incQueuedNrOfBytes(data.getMessage().length);
}
if (log.isTraceEnabled())
log.trace(sm.getString("AsyncSocketSender.queue.message",
getAddress().getHostAddress(), new Integer(getPort()), data.getUniqueId(), new Long(
data.getMessage().length)));
}
/**
* Reset sender statistics
*/
public synchronized void resetStatistics() {
super.resetStatistics();
inQueueCounter = queue.getSize();
outQueueCounter = 0;
queue.resetStatistics();
}
/**
* Name of this SockerSender
*/
public String toString() {
StringBuffer buf = new StringBuffer("FastAsyncSocketSender[");
buf.append(getAddress().getHostAddress()).append(":").append(getPort()).append("]");
return buf.toString();
}
// --------------------------------------------------------- Public Methods
/**
* Start Queue thread as daemon
*/
protected void checkThread() {
if (queueThread == null) {
if (log.isInfoEnabled())
log.info(sm.getString("AsyncSocketSender.create.thread",
getAddress(), new Integer(getPort())));
queueThread = new FastQueueThread(this, queue);
queueThread.setDaemon(true);
queueThread.setPriority(getThreadPriority());
queueThread.start();
}
}
/**
* stop queue worker thread
*/
protected void stopThread() {
if (queueThread != null) {
queueThread.stopRunning();
queueThread = null;
}
}
// -------------------------------------------------------- Inner Class
private class FastQueueThread extends Thread {
/**
* Sender queue
*/
private IQueue queue = null;
/**
* Active sender
*/
private FastAsyncSocketSender sender = null;
/**
* Thread is active
*/
private boolean keepRunning = true;
/**
* Current number of bytes from all queued messages
*/
private long queuedNrOfBytes = 0;
/**
* Only use inside FastAsyncSocketSender
* @param sender
* @param queue
*/
private FastQueueThread(FastAsyncSocketSender sender, IQueue queue) {
setName("Cluster-FastAsyncSocketSender-" + (threadCounter++));
this.queue = queue;
this.sender = sender;
}
/**
* @return Returns the queuedNrOfBytes.
*/
public long getQueuedNrOfBytes() {
return queuedNrOfBytes;
}
protected synchronized void setQueuedNrOfBytes(long queuedNrOfBytes) {
this.queuedNrOfBytes = queuedNrOfBytes;
}
protected synchronized void incQueuedNrOfBytes(long size) {
queuedNrOfBytes += size;
}
protected synchronized void decQueuedNrOfBytes(long size) {
queuedNrOfBytes -= size;
}
/**
* Stop backend thread!
*/
public void stopRunning() {
keepRunning = false;
}
/**
* Get the objects from queue and send all mesages to the sender.
* @see java.lang.Runnable#run()
*/
public void run() {
while (keepRunning) {
LinkObject entry = getQueuedMessage();
if (entry != null) {
pushQueuedMessages(entry);
} else {
if (keepRunning) {
log.warn(sm.getString("AsyncSocketSender.queue.empty",
sender.getAddress(), new Integer(sender
.getPort())));
}
}
}
}
/**
* Get List of queue cluster messages
* @return list of cluster messages
*/
protected LinkObject getQueuedMessage() {
// get a link list of all queued objects
if (log.isTraceEnabled())
log.trace("Queuesize before=" + ((FastQueue) queue).getSize());
LinkObject entry = queue.remove();
if (log.isTraceEnabled())
log.trace("Queuesize after=" + ((FastQueue) queue).getSize());
return entry;
}
/**
* Push all messages from queue to other nodes. Is revovery configured
* make a resends with some waits.
* @param entry list of messages
*/
protected void pushQueuedMessages(LinkObject entry) {
do {
int messagesize = 0;
ClusterData data = null ;
try {
data = (ClusterData) entry.data();
messagesize = data.getMessage().length;
sender.pushMessage(data);
} catch (Exception x) {
long rTimeout = sender.getRecoverTimeout() ;
int rCounter = sender.getRecoverCounter() ;
if(data != null &&
rTimeout > 0 &&
rCounter > 0) {
// wait that network get stabler
int counter = 1;
boolean success = false ;
do {
try {
Thread.sleep(rTimeout*counter);
} catch (Exception sleep) {
}
try {
if(log.isDebugEnabled()) {
log.debug(sm.getString("AsyncSocketSender.send.recover",
entry.getKey(),
new Integer(counter),
new Integer(rCounter), new Long(rTimeout))) ;
}
sender.pushMessage(data);
success = true;
} catch (Exception xx) {
counter++;
}
} while (keepRunning && !success && counter <= rCounter);
if(!success) {
log.warn(sm.getString(
"AsyncSocketSender.send.error", entry
.getKey()), x);
}
} else {
log.warn(sm.getString(
"AsyncSocketSender.send.error", entry
.getKey()), x);
}
} finally {
outQueueCounter++;
decQueuedNrOfBytes(messagesize);
}
entry = entry.next();
} while (keepRunning && entry != null);
}
}
}