| package org.apache.catalina.tribes.demos; |
| |
| import java.io.Serializable; |
| |
| import org.apache.catalina.tribes.Member; |
| import org.apache.catalina.tribes.group.RpcCallback; |
| import org.apache.catalina.tribes.Channel; |
| import org.apache.catalina.tribes.ManagedChannel; |
| import org.apache.catalina.tribes.group.RpcChannel; |
| import org.apache.catalina.tribes.group.Response; |
| |
| |
| /** |
| * <p>Title: </p> |
| * |
| * <p>Description: </p> |
| * |
| * <p>Copyright: Copyright (c) 2005</p> |
| * |
| * <p>Company: </p> |
| * |
| * @author not attributable |
| * @version 1.0 |
| */ |
| public class EchoRpcTest implements RpcCallback, Runnable { |
| |
| Channel channel; |
| int count; |
| String message; |
| long pause; |
| RpcChannel rpc; |
| int options; |
| long timeout; |
| String name; |
| |
| public EchoRpcTest(Channel channel, String name, int count, String message, long pause, int options, long timeout) { |
| this.channel = channel; |
| this.count = count; |
| this.message = message; |
| this.pause = pause; |
| this.options = options; |
| this.rpc = new RpcChannel(name.getBytes(),channel,this); |
| this.timeout = timeout; |
| this.name = name; |
| } |
| |
| /** |
| * If the reply has already been sent to the requesting thread, the rpc |
| * callback can handle any data that comes in after the fact. |
| * |
| * @param msg Serializable |
| * @param sender Member |
| * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback |
| * method |
| */ |
| public void leftOver(Serializable msg, Member sender) { |
| System.out.println("Received a left over message from ["+sender.getName()+"] with data ["+msg+"]"); |
| } |
| |
| /** |
| * |
| * @param msg Serializable |
| * @param sender Member |
| * @return Serializable - null if no reply should be sent |
| * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback |
| * method |
| */ |
| public Serializable replyRequest(Serializable msg, Member sender) { |
| System.out.println("Received a reply request message from ["+sender.getName()+"] with data ["+msg+"]"); |
| return "Reply("+name+"):"+msg; |
| } |
| |
| public void run() { |
| long counter = 0; |
| while (counter<count) { |
| String msg = message + " cnt="+(++counter); |
| try { |
| System.out.println("Sending ["+msg+"]"); |
| long start = System.currentTimeMillis(); |
| Response[] resp = rpc.send(channel.getMembers(),(Serializable)msg,options,Channel.SEND_OPTIONS_DEFAULT,timeout); |
| System.out.println("Send of ["+msg+"] completed. Nr of responses="+resp.length+" Time:"+(System.currentTimeMillis()-start)+" ms."); |
| for ( int i=0; i<resp.length; i++ ) { |
| System.out.println("Received a response message from ["+resp[i].getSource().getName()+"] with data ["+resp[i].getMessage()+"]"); |
| } |
| Thread.sleep(pause); |
| }catch(Exception x){} |
| } |
| } |
| |
| public static void usage() { |
| System.out.println("Tribes RPC tester."); |
| System.out.println("Usage:\n\t"+ |
| "java EchoRpcTest [options]\n\t"+ |
| "Options:\n\t\t"+ |
| "[-mode all|first|majority] \n\t\t"+ |
| "[-debug] \n\t\t"+ |
| "[-count messagecount] \n\t\t"+ |
| "[-timeout timeoutinms] \n\t\t"+ |
| "[-stats statinterval] \n\t\t"+ |
| "[-pause nrofsecondstopausebetweensends] \n\t\t"+ |
| "[-message message] \n\t\t"+ |
| "[-name rpcname] \n\t\t"+ |
| "[-break (halts execution on exception)]\n"+ |
| "\tChannel options:"+ |
| ChannelCreator.usage()+"\n\n"+ |
| "Example:\n\t"+ |
| "java EchoRpcTest -port 4004\n\t"+ |
| "java EchoRpcTest -bind 192.168.0.45 -port 4005\n\t"+ |
| "java EchoRpcTest -bind 192.168.0.45 -port 4005 -mbind 192.168.0.45 -count 100 -stats 10\n"); |
| } |
| |
| public static void main(String[] args) throws Exception { |
| boolean send = true; |
| boolean debug = false; |
| long pause = 3000; |
| int count = 1000000; |
| int stats = 10000; |
| String name = "EchoRpcId"; |
| boolean breakOnEx = false; |
| int threads = 1; |
| int options = RpcChannel.ALL_REPLY; |
| long timeout = 15000; |
| String message = "EchoRpcMessage"; |
| if ( args.length == 0 ) { |
| args = new String[] {"-help"}; |
| } |
| for (int i = 0; i < args.length; i++) { |
| if ("-threads".equals(args[i])) { |
| threads = Integer.parseInt(args[++i]); |
| } else if ("-count".equals(args[i])) { |
| count = Integer.parseInt(args[++i]); |
| System.out.println("Sending "+count+" messages."); |
| } else if ("-pause".equals(args[i])) { |
| pause = Long.parseLong(args[++i])*1000; |
| } else if ("-break".equals(args[i])) { |
| breakOnEx = true; |
| } else if ("-stats".equals(args[i])) { |
| stats = Integer.parseInt(args[++i]); |
| System.out.println("Stats every "+stats+" message"); |
| } else if ("-timeout".equals(args[i])) { |
| timeout = Long.parseLong(args[++i]); |
| } else if ("-message".equals(args[i])) { |
| message = args[++i]; |
| } else if ("-name".equals(args[i])) { |
| name = args[++i]; |
| } else if ("-mode".equals(args[i])) { |
| if ( "all".equals(args[++i]) ) options = RpcChannel.ALL_REPLY; |
| else if ( "first".equals(args[i]) ) options = RpcChannel.FIRST_REPLY; |
| else if ( "majority".equals(args[i]) ) options = RpcChannel.MAJORITY_REPLY; |
| } else if ("-debug".equals(args[i])) { |
| debug = true; |
| } else if ("-help".equals(args[i])) |
| { |
| usage(); |
| System.exit(1); |
| } |
| } |
| |
| |
| ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args); |
| EchoRpcTest test = new EchoRpcTest(channel,name,count,message,pause,options,timeout); |
| channel.start(channel.DEFAULT); |
| Runtime.getRuntime().addShutdownHook(new Shutdown(channel)); |
| test.run(); |
| |
| System.out.println("System test complete, sleeping to let threads finish."); |
| Thread.sleep(60*1000*60); |
| } |
| |
| public static class Shutdown extends Thread { |
| ManagedChannel channel = null; |
| public Shutdown(ManagedChannel channel) { |
| this.channel = channel; |
| } |
| |
| public void run() { |
| System.out.println("Shutting down..."); |
| SystemExit exit = new SystemExit(5000); |
| exit.setDaemon(true); |
| exit.start(); |
| try { |
| channel.stop(channel.DEFAULT); |
| |
| }catch ( Exception x ) { |
| x.printStackTrace(); |
| } |
| System.out.println("Channel stopped."); |
| } |
| } |
| public static class SystemExit extends Thread { |
| private long delay; |
| public SystemExit(long delay) { |
| this.delay = delay; |
| } |
| public void run () { |
| try { |
| Thread.sleep(delay); |
| }catch ( Exception x ) { |
| x.printStackTrace(); |
| } |
| System.exit(0); |
| |
| } |
| }} |