blob: cca6926bd078cdf6e0b13c6ce04e202ee43cd4cf [file] [log] [blame]
/*
* Copyright 1999,2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.cluster.tcp;
import org.apache.catalina.cluster.io.XByteBuffer;
import org.apache.catalina.cluster.Member;
import org.apache.catalina.cluster.ClusterSender;
public class ReplicationTransmitter implements ClusterSender
{
private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog( ReplicationTransmitter.class );
private java.util.HashMap map = new java.util.HashMap();
public ReplicationTransmitter()
{
}
private static long nrOfRequests = 0;
private static long totalBytes = 0;
private String replicationMode;
private static synchronized void addStats(int length) {
nrOfRequests++;
totalBytes+=length;
if ( (nrOfRequests % 100) == 0 ) {
log.info("Nr of bytes sent="+totalBytes+" over "+nrOfRequests+" =="+(totalBytes/nrOfRequests)+" bytes/request");
}
}
public void setReplicationMode(String mode) {
String msg = IDataSenderFactory.validateMode(mode);
if (msg == null) {
log.debug("Setting replcation mode to " + mode);
this.replicationMode = mode;
}
else
throw new IllegalArgumentException(msg);
}
public synchronized void add(Member member)
{
try {
IDataSender sender = IDataSenderFactory.getIDataSender(
replicationMode, member);
String key = sender.getAddress().getHostAddress() + ":" +
sender.getPort();
if (!map.containsKey(key))
map.put(sender.getAddress().getHostAddress() + ":" +
sender.getPort(), sender);
}catch ( java.io.IOException x ) {
log.error("Unable to create and add a IDataSender object.",x);
}
}//add
public synchronized void remove(Member member)
{
String key = member.getHost() + ":" + member.getPort();
IDataSender toberemoved = (IDataSender) map.get(key);
if (toberemoved == null)return;
toberemoved.disconnect();
map.remove(key);
}
public void start() throws java.io.IOException
{
//don't have to do shit, we connect on demand
}
public synchronized void stop()
{
java.util.Iterator i = map.entrySet().iterator();
while ( i.hasNext() )
{
IDataSender sender = (IDataSender)((java.util.Map.Entry)i.next()).getValue();
try { sender.disconnect(); } catch ( Exception x ){}
}//while
}//stop
public IDataSender[] getSenders()
{
java.util.Iterator i = map.entrySet().iterator();
java.util.Vector v = new java.util.Vector();
while ( i.hasNext() )
{
IDataSender sender = (IDataSender)((java.util.Map.Entry)i.next()).getValue();
if ( sender!=null) v.addElement(sender);
}
IDataSender[] result = new IDataSender[v.size()];
v.copyInto(result);
return result;
}
protected void sendMessageData(String sessionId, byte[] data, IDataSender sender) throws java.io.IOException {
if ( sender == null ) throw new java.io.IOException("Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
try
{
if (!sender.isConnected())
sender.connect();
sender.sendMessage(sessionId,data);
sender.setSuspect(false);
addStats(data.length);
}catch ( Exception x)
{
if ( !sender.getSuspect() ) {
log.warn("Unable to send replicated message, is server down?",
x);
}
sender.setSuspect(true);
}
}
public void sendMessage(String sessionId, byte[] indata, Member member) throws java.io.IOException
{
byte[] data = XByteBuffer.createDataPackage(indata);
String key = member.getHost()+":"+member.getPort();
IDataSender sender = (IDataSender)map.get(key);
sendMessageData(sessionId,data,sender);
}
public void sendMessage(String sessionId, byte[] indata) throws java.io.IOException
{
IDataSender[] senders = getSenders();
byte[] data = XByteBuffer.createDataPackage(indata);
for ( int i=0; i<senders.length; i++ )
{
IDataSender sender = senders[i];
try
{
sendMessageData(sessionId,data,sender);
}catch ( Exception x)
{
if ( !sender.getSuspect()) log.warn("Unable to send replicated message to "+sender+", is server down?",x);
sender.setSuspect(true);
}
}//while
}
public String getReplicationMode() {
return replicationMode;
}
public boolean getIsSenderSynchronized() {
return IDataSenderFactory.SYNC_MODE.equals(replicationMode) ||
IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode);
}
}