package org.apache.catalina.tribes.test.channel;

import junit.framework.TestCase;
import java.io.Serializable;
import java.util.Random;
import java.util.Arrays;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener;
import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;

/**
 * <p>Title: </p> 
 * 
 * <p>Description: </p> 
 * 
 * <p>Copyright: Copyright (c) 2005</p> 
 * 
 * <p>Company: </p>
 * 
 * @author not attributable
 * @version 1.0
 */
public class TestDataIntegrity extends TestCase {
    int msgCount = 1000;
    int threadCount = 20;
    GroupChannel channel1;
    GroupChannel channel2;
    Listener listener1;
    int threadCounter = 0;
    protected void setUp() throws Exception {
        super.setUp();
        channel1 = new GroupChannel();
        channel1.addInterceptor(new MessageDispatch15Interceptor());
        channel2 = new GroupChannel();
        channel2.addInterceptor(new MessageDispatch15Interceptor());
        listener1 = new Listener();
        channel2.addChannelListener(listener1);
        channel1.start(GroupChannel.DEFAULT);
        channel2.start(GroupChannel.DEFAULT);
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        channel1.stop(GroupChannel.DEFAULT);
        channel2.stop(GroupChannel.DEFAULT);
    }
    
    public void testDataSendNO_ACK() throws Exception {
        System.err.println("Starting NO_ACK");
        Thread[] threads = new Thread[threadCount];
        for (int x=0; x<threads.length; x++ ) {
            threads[x] = new Thread() {
                public void run() {
                    try {
                        for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(),0);
                    }catch ( Exception x ) {
                        x.printStackTrace();
                        return;
                    } finally {
                        threadCounter++;
                    }
                }
            };
        }
        for (int x=0; x<threads.length; x++ ) { threads[x].start();}
        for (int x=0; x<threads.length; x++ ) { threads[x].join();}
        //sleep for 50 sec, let the other messages in
        long start = System.currentTimeMillis();
        while ( (System.currentTimeMillis()-start)<15000 && msgCount*threadCount!=listener1.count) Thread.sleep(500);
        System.err.println("Finished NO_ACK");
        assertEquals("Checking success messages.",msgCount*threadCount,listener1.count);
    }
    
    public void testDataSendASYNCM() throws Exception {
            System.err.println("Starting ASYNC MULTI THREAD");
            Thread[] threads = new Thread[threadCount];
            for (int x=0; x<threads.length; x++ ) {
                threads[x] = new Thread() {
                    public void run() {
                        try {
                            for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS);
                        }catch ( Exception x ) {
                            x.printStackTrace();
                            return;
                        } finally {
                            threadCounter++;
                        }
                    }
                };
            }
            for (int x=0; x<threads.length; x++ ) { threads[x].start();}
            for (int x=0; x<threads.length; x++ ) { threads[x].join();}
            //sleep for 50 sec, let the other messages in
            long start = System.currentTimeMillis();
            while ( (System.currentTimeMillis()-start)<15000 && msgCount*threadCount!=listener1.count) Thread.sleep(500);
            System.err.println("Finished ASYNC MULTI THREAD");
            assertEquals("Checking success messages.",msgCount*threadCount,listener1.count);
    }
    public void testDataSendASYNC() throws Exception {
        System.err.println("Starting ASYNC");
        for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS);
        //sleep for 50 sec, let the other messages in
        long start = System.currentTimeMillis();
        while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count) Thread.sleep(500);
        System.err.println("Finished ASYNC");
        assertEquals("Checking success messages.",msgCount,listener1.count);
    }

    public void testDataSendACK() throws Exception {
        System.err.println("Starting ACK");
        for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_USE_ACK);
        Thread.sleep(250);
        System.err.println("Finished ACK");
        assertEquals("Checking success messages.",msgCount,listener1.count);
    }

    public void testDataSendSYNCACK() throws Exception {
        System.err.println("Starting SYNC_ACK");
        for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK);
        Thread.sleep(250);
        System.err.println("Finished SYNC_ACK");
        assertEquals("Checking success messages.",msgCount,listener1.count);
    }

    public static class Listener implements ChannelListener {
        long count = 0;
        public boolean accept(Serializable s, Member m) {
            return (s instanceof Data);
        }
        
        public void messageReceived(Serializable s, Member m) {
            Data d = (Data)s;
            if ( !Data.verify(d) ) {
                System.err.println("ERROR");
            } else {
                count++;
                if ((count %1000) ==0 ) {
                    System.err.println("SUCCESS:"+count);
                }
            }
        }
    }
    
    public static class Data implements Serializable {
        public int length;
        public byte[] data;
        public byte key;
        public static Random r = new Random(System.currentTimeMillis());
        public static Data createRandomData() {
            int i = r.nextInt();
            i = ( i % 127 );
            int length = Math.abs(r.nextInt() % 65555);
            Data d = new Data();
            d.length = length;
            d.key = (byte)i;
            d.data = new byte[length];
            Arrays.fill(d.data,d.key);
            return d;
        }
        
        public static boolean verify(Data d) {
            boolean result = (d.length == d.data.length);
            for ( int i=0; result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key;
            return result;
        }
    }
    
    

}
