blob: 0c2ff20aed2a9760b78768a7f9f7b0ce0dcf5e62 [file] [log] [blame]
package org.apache.catalina.tribes.demos;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;
import org.apache.catalina.tribes.ChannelInterceptor;
import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.catalina.tribes.util.Arrays;
public class CoordinationDemo {
static int CHANNEL_COUNT = 5;
static int SCREEN_WIDTH = 120;
static long SLEEP_TIME = 10;
static int CLEAR_SCREEN = 30;
static boolean MULTI_THREAD = false;
static boolean[] VIEW_EVENTS = new boolean[255];
StringBuffer statusLine = new StringBuffer();
Status[] status = null;
BufferedReader reader = null;
/**
* Construct and show the application.
*/
public CoordinationDemo() {
}
public void init() {
reader = new BufferedReader(new InputStreamReader(System.in));
status = new Status[CHANNEL_COUNT];
}
public void clearScreen() {
StringBuffer buf = new StringBuffer(700);
for (int i=0; i<CLEAR_SCREEN; i++ ) buf.append("\n");
System.out.println(buf);
}
public void printMenuOptions() {
System.out.println("Commands:");
System.out.println("\tstart [member id]");
System.out.println("\tstop [member id]");
System.out.println("\tprint (refresh)");
System.out.println("\tquit");
System.out.print("Enter command:");
}
public synchronized void printScreen() {
clearScreen();
System.out.println(" ###."+getHeader());
for ( int i=0; i<status.length; i++ ) {
System.out.print(leftfill(String.valueOf(i+1)+".",5," "));
if ( status[i] != null ) System.out.print(status[i].getStatusLine());
}
System.out.println("\n\n");
System.out.println("Overall status:"+statusLine);
printMenuOptions();
}
public String getHeader() {
//member - 30
//running- 10
//coord - 30
//view-id - 24
//view count - 8
StringBuffer buf = new StringBuffer();
buf.append(leftfill("Member",30," "));
buf.append(leftfill("Running",10," "));
buf.append(leftfill("Coord",30," "));
buf.append(leftfill("View-id(short)",24," "));
buf.append(leftfill("Count",8," "));
buf.append("\n");
buf.append(rightfill("==="+new java.sql.Timestamp(System.currentTimeMillis()).toString(),SCREEN_WIDTH,"="));
buf.append("\n");
return buf.toString();
}
public String[] tokenize(String line) {
StringTokenizer tz = new StringTokenizer(line," ");
String[] result = new String[tz.countTokens()];
for (int i=0; i<result.length; i++ ) result[i] = tz.nextToken();
return result;
}
public void waitForInput() throws IOException {
for ( int i=0; i<status.length; i++ ) status[i] = new Status(this);
printScreen();
String l = reader.readLine();
String[] args = tokenize(l);
while ( args.length >= 1 && (!"quit".equalsIgnoreCase(args[0]))) {
if ("start".equalsIgnoreCase(args[0])) {
cmdStart(args);
} else if ("stop".equalsIgnoreCase(args[0])) {
cmdStop(args);
}
printScreen();
l = reader.readLine();
args = tokenize(l);
}
for ( int i=0; i<status.length; i++ ) status[i].stop();
}
private void cmdStop(String[] args) {
if ( args.length == 1 ) {
setSystemStatus("System shutting down...");
Thread[] t = new Thread[CHANNEL_COUNT];
for (int i = 0; i < status.length; i++) {
final int j = i;
t[j] = new Thread() {
public void run() {
status[j].stop();
}
};
}
for (int i = 0; i < status.length; i++) if (MULTI_THREAD ) t[i].start(); else t[i].run();
setSystemStatus("System stopped.");
} else {
int index = -1;
try { index = Integer.parseInt(args[1])-1;}catch ( Exception x ) {setSystemStatus("Invalid index:"+args[1]);}
if ( index >= 0 ) {
setSystemStatus("Stopping member:"+(index+1));
status[index].stop();
setSystemStatus("Member stopped:"+(index+1));
}
}
}
private void cmdStart(String[] args) {
if ( args.length == 1 ) {
setSystemStatus("System starting up...");
Thread[] t = new Thread[CHANNEL_COUNT];
for (int i = 0; i < status.length; i++) {
final int j = i;
t[j] = new Thread() {
public void run() {
status[j].start();
}
};
}
for (int i = 0; i < status.length; i++) if (MULTI_THREAD ) t[i].start(); else t[i].run();
setSystemStatus("System started.");
} else {
int index = -1;
try { index = Integer.parseInt(args[1])-1;}catch ( Exception x ) {setSystemStatus("Invalid index:"+args[1]);}
if ( index >= 0 ) {
setSystemStatus("Starting member:"+(index+1));
status[index].start();
setSystemStatus("Member started:"+(index+1));
}
}
}
public void setSystemStatus(String status) {
statusLine.delete(0,statusLine.length());
statusLine.append(status);
}
public static void setEvents(String events) {
java.util.Arrays.fill(VIEW_EVENTS,false);
StringTokenizer t = new StringTokenizer(events,",");
while (t.hasMoreTokens() ) {
int idx = Integer.parseInt(t.nextToken());
VIEW_EVENTS[idx] = true;
}
}
public static void run(String[] args,CoordinationDemo demo) throws Exception {
usage();
java.util.Arrays.fill(VIEW_EVENTS,true);
for (int i=0; i<args.length; i++ ) {
if ( "-c".equals(args[i]) )
CHANNEL_COUNT = Integer.parseInt(args[++i]);
else if ( "-t".equals(args[i]) )
MULTI_THREAD = Boolean.parseBoolean(args[++i]);
else if ( "-s".equals(args[i]) )
SLEEP_TIME = Long.parseLong(args[++i]);
else if ( "-sc".equals(args[i]) )
CLEAR_SCREEN = Integer.parseInt(args[++i]);
else if ( "-p".equals(args[i]) )
setEvents(args[++i]);
else if ( "-h".equals(args[i]) ) System.exit(0);
}
demo.init();
demo.waitForInput();
}
private static void usage() {
System.out.println("Usage:");
System.out.println("\tjava org.apache.catalina.tribes.demos.CoordinationDemo -c channel-count(int) -t multi-thread(true|false) -s sleep-time(ms) -sc clear-screen(int) -p view_events_csv(1,2,5,7)");
System.out.println("Example:");
System.out.println("\tjava o.a.c.t.d.CoordinationDemo -> starts demo single threaded start/stop with 5 channels");
System.out.println("\tjava o.a.c.t.d.CoordinationDemo -c 10 -> starts demo single threaded start/stop with 10 channels");
System.out.println("\tjava o.a.c.t.d.CoordinationDemo -c 7 -t true -s 1000 -sc 50-> starts demo multi threaded start/stop with 7 channels and 1 second sleep time between events and 50 lines to clear screen");
System.out.println("\tjava o.a.c.t.d.CoordinationDemo -t true -p 12 -> starts demo multi threaded start/stop with 5 channels and only prints the EVT_CONF_RX event");
System.out.println();
}
public static void main(String[] args) throws Exception {
CoordinationDemo demo = new CoordinationDemo();
run(args,demo);
}
public static String leftfill(String value, int length, String ch) {
return fill(value,length,ch,true);
}
public static String rightfill(String value, int length, String ch) {
return fill(value,length,ch,false);
}
public static String fill(String value, int length, String ch, boolean left) {
StringBuffer buf = new StringBuffer();
if ( !left ) buf.append(value.trim());
for (int i=value.trim().length(); i<length; i++ ) buf.append(ch);
if ( left ) buf.append(value.trim());
return buf.toString();
}
public static class Status {
public CoordinationDemo parent;
public GroupChannel channel;
NonBlockingCoordinator interceptor = null;
public String status;
public Exception error;
public String startstatus = "new";
public Status(CoordinationDemo parent) {
this.parent = parent;
}
public String getStatusLine() {
//member - 30
//running- 10
//coord - 30
//view-id - 24
//view count - 8
StringBuffer buf = new StringBuffer();
String local = "";
String coord = "";
String viewId = "";
String count = "0";
if ( channel != null ) {
Member lm = channel.getLocalMember(false);
local = lm!=null?lm.getName():"";
coord = interceptor!=null && interceptor.getCoordinator()!=null?interceptor.getCoordinator().getName():"";
viewId = getByteString(interceptor.getViewId()!=null?interceptor.getViewId().getBytes():new byte[0]);
count = String.valueOf(interceptor.getView().length);
}
buf.append(leftfill(local,30," "));
buf.append(leftfill(startstatus, 10, " "));
buf.append(leftfill(coord, 30, " "));
buf.append(leftfill(viewId, 24, " "));
buf.append(leftfill(count, 8, " "));
buf.append("\n");
buf.append("Status:"+status);
buf.append("\n");
return buf.toString();
}
public String getByteString(byte[] b) {
if ( b == null ) return "{}";
return Arrays.toString(b,0,Math.min(b.length,4));
}
public void start() {
try {
if ( channel == null ) {
channel = createChannel();
startstatus = "starting";
channel.start(channel.DEFAULT);
startstatus = "running";
} else {
status = "Channel already started.";
}
} catch ( Exception x ) {
synchronized (System.err) {
System.err.println("Start failed:");
StackTraceElement[] els = x.getStackTrace();
for (int i = 0; i < els.length; i++) System.err.println(els[i].toString());
}
status = "Start failed:"+x.getMessage();
error = x;
startstatus = "failed";
try { channel.stop(GroupChannel.DEFAULT);}catch(Exception ignore){}
channel = null;
interceptor = null;
}
}
public void stop() {
try {
if ( channel != null ) {
channel.stop(channel.DEFAULT);
status = "Channel Stopped";
} else {
status = "Channel Already Stopped";
}
}catch ( Exception x ) {
synchronized (System.err) {
System.err.println("Stop failed:");
StackTraceElement[] els = x.getStackTrace();
for (int i = 0; i < els.length; i++) System.err.println(els[i].toString());
}
status = "Stop failed:"+x.getMessage();
error = x;
}finally {
startstatus = "stopped";
channel = null;
interceptor = null;
}
}
public GroupChannel createChannel() {
channel = new GroupChannel();
((ReceiverBase)channel.getChannelReceiver()).setAutoBind(100);
interceptor = new NonBlockingCoordinator() {
public void fireInterceptorEvent(InterceptorEvent event) {
status = event.getEventTypeDesc();
int type = event.getEventType();
boolean display = VIEW_EVENTS[type];
if ( display ) parent.printScreen();
try { Thread.sleep(SLEEP_TIME); }catch ( Exception x){}
}
};
channel.addInterceptor(interceptor);
channel.addInterceptor(new TcpFailureDetector());
channel.addInterceptor(new MessageDispatch15Interceptor());
return channel;
}
}
}