| package org.apache.catalina.tribes.test.channel; |
| |
| import junit.framework.TestCase; |
| import java.io.Serializable; |
| import java.util.Random; |
| import java.util.Arrays; |
| import org.apache.catalina.tribes.ChannelListener; |
| import org.apache.catalina.tribes.Member; |
| import org.apache.catalina.tribes.group.GroupChannel; |
| import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener; |
| import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor; |
| import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; |
| |
| /** |
| * <p>Title: </p> |
| * |
| * <p>Description: </p> |
| * |
| * <p>Copyright: Copyright (c) 2005</p> |
| * |
| * <p>Company: </p> |
| * |
| * @author not attributable |
| * @version 1.0 |
| */ |
| public class TestDataIntegrity extends TestCase { |
| int msgCount = 1000; |
| int threadCount = 20; |
| GroupChannel channel1; |
| GroupChannel channel2; |
| Listener listener1; |
| int threadCounter = 0; |
| protected void setUp() throws Exception { |
| super.setUp(); |
| channel1 = new GroupChannel(); |
| channel1.addInterceptor(new MessageDispatch15Interceptor()); |
| channel2 = new GroupChannel(); |
| channel2.addInterceptor(new MessageDispatch15Interceptor()); |
| listener1 = new Listener(); |
| channel2.addChannelListener(listener1); |
| channel1.start(GroupChannel.DEFAULT); |
| channel2.start(GroupChannel.DEFAULT); |
| } |
| |
| protected void tearDown() throws Exception { |
| super.tearDown(); |
| channel1.stop(GroupChannel.DEFAULT); |
| channel2.stop(GroupChannel.DEFAULT); |
| } |
| |
| public void testDataSendNO_ACK() throws Exception { |
| System.err.println("Starting NO_ACK"); |
| Thread[] threads = new Thread[threadCount]; |
| for (int x=0; x<threads.length; x++ ) { |
| threads[x] = new Thread() { |
| public void run() { |
| try { |
| for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(),0); |
| }catch ( Exception x ) { |
| x.printStackTrace(); |
| return; |
| } finally { |
| threadCounter++; |
| } |
| } |
| }; |
| } |
| for (int x=0; x<threads.length; x++ ) { threads[x].start();} |
| for (int x=0; x<threads.length; x++ ) { threads[x].join();} |
| //sleep for 50 sec, let the other messages in |
| long start = System.currentTimeMillis(); |
| while ( (System.currentTimeMillis()-start)<15000 && msgCount*threadCount!=listener1.count) Thread.sleep(500); |
| System.err.println("Finished NO_ACK"); |
| assertEquals("Checking success messages.",msgCount*threadCount,listener1.count); |
| } |
| |
| public void testDataSendASYNCM() throws Exception { |
| System.err.println("Starting ASYNC MULTI THREAD"); |
| Thread[] threads = new Thread[threadCount]; |
| for (int x=0; x<threads.length; x++ ) { |
| threads[x] = new Thread() { |
| public void run() { |
| try { |
| for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS); |
| }catch ( Exception x ) { |
| x.printStackTrace(); |
| return; |
| } finally { |
| threadCounter++; |
| } |
| } |
| }; |
| } |
| for (int x=0; x<threads.length; x++ ) { threads[x].start();} |
| for (int x=0; x<threads.length; x++ ) { threads[x].join();} |
| //sleep for 50 sec, let the other messages in |
| long start = System.currentTimeMillis(); |
| while ( (System.currentTimeMillis()-start)<15000 && msgCount*threadCount!=listener1.count) Thread.sleep(500); |
| System.err.println("Finished ASYNC MULTI THREAD"); |
| assertEquals("Checking success messages.",msgCount*threadCount,listener1.count); |
| } |
| public void testDataSendASYNC() throws Exception { |
| System.err.println("Starting ASYNC"); |
| for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS); |
| //sleep for 50 sec, let the other messages in |
| long start = System.currentTimeMillis(); |
| while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count) Thread.sleep(500); |
| System.err.println("Finished ASYNC"); |
| assertEquals("Checking success messages.",msgCount,listener1.count); |
| } |
| |
| public void testDataSendACK() throws Exception { |
| System.err.println("Starting ACK"); |
| for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_USE_ACK); |
| Thread.sleep(250); |
| System.err.println("Finished ACK"); |
| assertEquals("Checking success messages.",msgCount,listener1.count); |
| } |
| |
| public void testDataSendSYNCACK() throws Exception { |
| System.err.println("Starting SYNC_ACK"); |
| for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK); |
| Thread.sleep(250); |
| System.err.println("Finished SYNC_ACK"); |
| assertEquals("Checking success messages.",msgCount,listener1.count); |
| } |
| |
| public static class Listener implements ChannelListener { |
| long count = 0; |
| public boolean accept(Serializable s, Member m) { |
| return (s instanceof Data); |
| } |
| |
| public void messageReceived(Serializable s, Member m) { |
| Data d = (Data)s; |
| if ( !Data.verify(d) ) { |
| System.err.println("ERROR"); |
| } else { |
| count++; |
| if ((count %1000) ==0 ) { |
| System.err.println("SUCCESS:"+count); |
| } |
| } |
| } |
| } |
| |
| public static class Data implements Serializable { |
| public int length; |
| public byte[] data; |
| public byte key; |
| public static Random r = new Random(System.currentTimeMillis()); |
| public static Data createRandomData() { |
| int i = r.nextInt(); |
| i = ( i % 127 ); |
| int length = Math.abs(r.nextInt() % 65555); |
| Data d = new Data(); |
| d.length = length; |
| d.key = (byte)i; |
| d.data = new byte[length]; |
| Arrays.fill(d.data,d.key); |
| return d; |
| } |
| |
| public static boolean verify(Data d) { |
| boolean result = (d.length == d.data.length); |
| for ( int i=0; result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key; |
| return result; |
| } |
| } |
| |
| |
| |
| } |