blob: d3f27fd33cc7e14fd080f7c026dd7b12c17f5fc1 [file] [log] [blame]
package org.apache.catalina.tribes.test.channel;
import junit.framework.TestCase;
import java.io.Serializable;
import java.util.Random;
import java.util.Arrays;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener;
import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
/**
* <p>Title: </p>
*
* <p>Description: </p>
*
* <p>Copyright: Copyright (c) 2005</p>
*
* <p>Company: </p>
*
* @author not attributable
* @version 1.0
*/
public class TestDataIntegrity extends TestCase {
int msgCount = 1000;
int threadCount = 20;
GroupChannel channel1;
GroupChannel channel2;
Listener listener1;
int threadCounter = 0;
protected void setUp() throws Exception {
super.setUp();
channel1 = new GroupChannel();
channel1.addInterceptor(new MessageDispatch15Interceptor());
channel2 = new GroupChannel();
channel2.addInterceptor(new MessageDispatch15Interceptor());
listener1 = new Listener();
channel2.addChannelListener(listener1);
channel1.start(GroupChannel.DEFAULT);
channel2.start(GroupChannel.DEFAULT);
}
protected void tearDown() throws Exception {
super.tearDown();
channel1.stop(GroupChannel.DEFAULT);
channel2.stop(GroupChannel.DEFAULT);
}
public void testDataSendNO_ACK() throws Exception {
System.err.println("Starting NO_ACK");
Thread[] threads = new Thread[threadCount];
for (int x=0; x<threads.length; x++ ) {
threads[x] = new Thread() {
public void run() {
try {
for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(),0);
}catch ( Exception x ) {
x.printStackTrace();
return;
} finally {
threadCounter++;
}
}
};
}
for (int x=0; x<threads.length; x++ ) { threads[x].start();}
for (int x=0; x<threads.length; x++ ) { threads[x].join();}
//sleep for 50 sec, let the other messages in
long start = System.currentTimeMillis();
while ( (System.currentTimeMillis()-start)<15000 && msgCount*threadCount!=listener1.count) Thread.sleep(500);
System.err.println("Finished NO_ACK");
assertEquals("Checking success messages.",msgCount*threadCount,listener1.count);
}
public void testDataSendASYNCM() throws Exception {
System.err.println("Starting ASYNC MULTI THREAD");
Thread[] threads = new Thread[threadCount];
for (int x=0; x<threads.length; x++ ) {
threads[x] = new Thread() {
public void run() {
try {
for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS);
}catch ( Exception x ) {
x.printStackTrace();
return;
} finally {
threadCounter++;
}
}
};
}
for (int x=0; x<threads.length; x++ ) { threads[x].start();}
for (int x=0; x<threads.length; x++ ) { threads[x].join();}
//sleep for 50 sec, let the other messages in
long start = System.currentTimeMillis();
while ( (System.currentTimeMillis()-start)<15000 && msgCount*threadCount!=listener1.count) Thread.sleep(500);
System.err.println("Finished ASYNC MULTI THREAD");
assertEquals("Checking success messages.",msgCount*threadCount,listener1.count);
}
public void testDataSendASYNC() throws Exception {
System.err.println("Starting ASYNC");
for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS);
//sleep for 50 sec, let the other messages in
long start = System.currentTimeMillis();
while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count) Thread.sleep(500);
System.err.println("Finished ASYNC");
assertEquals("Checking success messages.",msgCount,listener1.count);
}
public void testDataSendACK() throws Exception {
System.err.println("Starting ACK");
for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_USE_ACK);
Thread.sleep(250);
System.err.println("Finished ACK");
assertEquals("Checking success messages.",msgCount,listener1.count);
}
public void testDataSendSYNCACK() throws Exception {
System.err.println("Starting SYNC_ACK");
for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK);
Thread.sleep(250);
System.err.println("Finished SYNC_ACK");
assertEquals("Checking success messages.",msgCount,listener1.count);
}
public static class Listener implements ChannelListener {
long count = 0;
public boolean accept(Serializable s, Member m) {
return (s instanceof Data);
}
public void messageReceived(Serializable s, Member m) {
Data d = (Data)s;
if ( !Data.verify(d) ) {
System.err.println("ERROR");
} else {
count++;
if ((count %1000) ==0 ) {
System.err.println("SUCCESS:"+count);
}
}
}
}
public static class Data implements Serializable {
public int length;
public byte[] data;
public byte key;
public static Random r = new Random(System.currentTimeMillis());
public static Data createRandomData() {
int i = r.nextInt();
i = ( i % 127 );
int length = Math.abs(r.nextInt() % 65555);
Data d = new Data();
d.length = length;
d.key = (byte)i;
d.data = new byte[length];
Arrays.fill(d.data,d.key);
return d;
}
public static boolean verify(Data d) {
boolean result = (d.length == d.data.length);
for ( int i=0; result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key;
return result;
}
}
}