package org.apache.catalina.tribes.test.channel; | |
import junit.framework.TestCase; | |
import java.io.Serializable; | |
import java.util.Random; | |
import java.util.Arrays; | |
import org.apache.catalina.tribes.ChannelListener; | |
import org.apache.catalina.tribes.Member; | |
import org.apache.catalina.tribes.group.GroupChannel; | |
import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener; | |
import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor; | |
import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; | |
/** | |
* <p>Title: </p> | |
* | |
* <p>Description: </p> | |
* | |
* <p>Copyright: Copyright (c) 2005</p> | |
* | |
* <p>Company: </p> | |
* | |
* @author not attributable | |
* @version 1.0 | |
*/ | |
public class TestDataIntegrity extends TestCase { | |
int msgCount = 1000; | |
int threadCount = 20; | |
GroupChannel channel1; | |
GroupChannel channel2; | |
Listener listener1; | |
int threadCounter = 0; | |
protected void setUp() throws Exception { | |
super.setUp(); | |
channel1 = new GroupChannel(); | |
channel1.addInterceptor(new MessageDispatch15Interceptor()); | |
channel2 = new GroupChannel(); | |
channel2.addInterceptor(new MessageDispatch15Interceptor()); | |
listener1 = new Listener(); | |
channel2.addChannelListener(listener1); | |
channel1.start(GroupChannel.DEFAULT); | |
channel2.start(GroupChannel.DEFAULT); | |
} | |
protected void tearDown() throws Exception { | |
super.tearDown(); | |
channel1.stop(GroupChannel.DEFAULT); | |
channel2.stop(GroupChannel.DEFAULT); | |
} | |
public void testDataSendNO_ACK() throws Exception { | |
System.err.println("Starting NO_ACK"); | |
Thread[] threads = new Thread[threadCount]; | |
for (int x=0; x<threads.length; x++ ) { | |
threads[x] = new Thread() { | |
public void run() { | |
try { | |
for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(),0); | |
}catch ( Exception x ) { | |
x.printStackTrace(); | |
return; | |
} finally { | |
threadCounter++; | |
} | |
} | |
}; | |
} | |
for (int x=0; x<threads.length; x++ ) { threads[x].start();} | |
for (int x=0; x<threads.length; x++ ) { threads[x].join();} | |
//sleep for 50 sec, let the other messages in | |
long start = System.currentTimeMillis(); | |
while ( (System.currentTimeMillis()-start)<15000 && msgCount*threadCount!=listener1.count) Thread.sleep(500); | |
System.err.println("Finished NO_ACK"); | |
assertEquals("Checking success messages.",msgCount*threadCount,listener1.count); | |
} | |
public void testDataSendASYNCM() throws Exception { | |
System.err.println("Starting ASYNC MULTI THREAD"); | |
Thread[] threads = new Thread[threadCount]; | |
for (int x=0; x<threads.length; x++ ) { | |
threads[x] = new Thread() { | |
public void run() { | |
try { | |
for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS); | |
}catch ( Exception x ) { | |
x.printStackTrace(); | |
return; | |
} finally { | |
threadCounter++; | |
} | |
} | |
}; | |
} | |
for (int x=0; x<threads.length; x++ ) { threads[x].start();} | |
for (int x=0; x<threads.length; x++ ) { threads[x].join();} | |
//sleep for 50 sec, let the other messages in | |
long start = System.currentTimeMillis(); | |
while ( (System.currentTimeMillis()-start)<15000 && msgCount*threadCount!=listener1.count) Thread.sleep(500); | |
System.err.println("Finished ASYNC MULTI THREAD"); | |
assertEquals("Checking success messages.",msgCount*threadCount,listener1.count); | |
} | |
public void testDataSendASYNC() throws Exception { | |
System.err.println("Starting ASYNC"); | |
for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS); | |
//sleep for 50 sec, let the other messages in | |
long start = System.currentTimeMillis(); | |
while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count) Thread.sleep(500); | |
System.err.println("Finished ASYNC"); | |
assertEquals("Checking success messages.",msgCount,listener1.count); | |
} | |
public void testDataSendACK() throws Exception { | |
System.err.println("Starting ACK"); | |
for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_USE_ACK); | |
Thread.sleep(250); | |
System.err.println("Finished ACK"); | |
assertEquals("Checking success messages.",msgCount,listener1.count); | |
} | |
public void testDataSendSYNCACK() throws Exception { | |
System.err.println("Starting SYNC_ACK"); | |
for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK); | |
Thread.sleep(250); | |
System.err.println("Finished SYNC_ACK"); | |
assertEquals("Checking success messages.",msgCount,listener1.count); | |
} | |
public static class Listener implements ChannelListener { | |
long count = 0; | |
public boolean accept(Serializable s, Member m) { | |
return (s instanceof Data); | |
} | |
public void messageReceived(Serializable s, Member m) { | |
Data d = (Data)s; | |
if ( !Data.verify(d) ) { | |
System.err.println("ERROR"); | |
} else { | |
count++; | |
if ((count %1000) ==0 ) { | |
System.err.println("SUCCESS:"+count); | |
} | |
} | |
} | |
} | |
public static class Data implements Serializable { | |
public int length; | |
public byte[] data; | |
public byte key; | |
public static Random r = new Random(System.currentTimeMillis()); | |
public static Data createRandomData() { | |
int i = r.nextInt(); | |
i = ( i % 127 ); | |
int length = Math.abs(r.nextInt() % 65555); | |
Data d = new Data(); | |
d.length = length; | |
d.key = (byte)i; | |
d.data = new byte[length]; | |
Arrays.fill(d.data,d.key); | |
return d; | |
} | |
public static boolean verify(Data d) { | |
boolean result = (d.length == d.data.length); | |
for ( int i=0; result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key; | |
return result; | |
} | |
} | |
} |