blob: 11480ca84561fae80f6956d143c4606ff454a3a0 [file] [log] [blame]
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activeio.packet;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import org.apache.activeio.AcceptListener;
import org.apache.activeio.Channel;
import org.apache.activeio.ChannelFactory;
import org.apache.activeio.adapter.SyncToAsyncChannel;
import org.apache.activeio.packet.EOSPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activeio.packet.async.AsyncChannel;
import org.apache.activeio.packet.async.AsyncChannelListener;
import org.apache.activeio.packet.async.AsyncChannelServer;
import org.apache.activeio.stats.CountStatisticImpl;
import org.apache.commons.beanutils.BeanUtils;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
/**
* Implements a simple tcp echo server for use in benchmarking
* activeio channel implementations.
*
* @version $Revision$
*/
public class Server implements Runnable, AcceptListener {
private URI url;
private CountDownLatch shutdownCountDownLatch;
private long requestDelay = 0;
private long sampleInterval = 1000;
private final CountStatisticImpl activeConnectionsCounter = new CountStatisticImpl("activeConnectionsCounter","The number of active connection attached to the server.");
private final CountStatisticImpl echoedBytesCounter = new CountStatisticImpl("echoedBytesCounter","The number of bytes that have been echoed by the server.");
public static void main(String[] args) throws URISyntaxException, IllegalAccessException, InvocationTargetException {
Server server = new Server();
HashMap options = new HashMap();
for( int i=0; i < args.length; i++ ) {
String option = args[i];
if( !option.startsWith("-") || option.length()<2 || i+1 >= args.length ) {
System.out.println("Invalid usage.");
return;
}
option = option.substring(1);
options.put(option, args[++i]);
}
BeanUtils.populate(server, options);
System.out.println();
System.out.println("Server starting with the following options: ");
System.out.println(" url="+server.getUrl());
System.out.println(" sampleInterval="+server.getSampleInterval());
System.out.println(" requestDelay="+server.getRequestDelay());
System.out.println();
server.run();
}
private void printSampleData() {
long now = System.currentTimeMillis();
float runDuration = (now - activeConnectionsCounter.getStartTime())/1000f;
System.out.println("Active connections: "+activeConnectionsCounter.getCount());
System.out.println("Echoed bytes: " + (echoedBytesCounter.getCount()/1024f) + " kb");
echoedBytesCounter.reset();
}
public void run() {
try {
activeConnectionsCounter.reset();
echoedBytesCounter.reset();
shutdownCountDownLatch = new CountDownLatch(1);
ChannelFactory factory = new ChannelFactory();
AsyncChannelServer server = factory.bindAsyncChannel(url);
System.out.println("Server accepting connections on: "+server.getConnectURI());
server.setAcceptListener(this);
server.start();
while(!shutdownCountDownLatch.await(sampleInterval, TimeUnit.MILLISECONDS)) {
printSampleData();
}
System.out.println("Stopping server.");
server.stop();
server.dispose();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
}
}
public String getUrl() {
return url.toString();
}
public void setUrl(String url) throws URISyntaxException {
this.url = new URI(url);
}
class ServerConnectionHandler implements AsyncChannelListener {
private final AsyncChannel asyncChannel;
private boolean disposed;
public ServerConnectionHandler(AsyncChannel asyncChannel) {
this.asyncChannel = asyncChannel;
activeConnectionsCounter.increment();
}
public void onPacket(Packet packet) {
if( packet == EOSPacket.EOS_PACKET ) {
System.out.println("Peer disconnected.");
dispose();
return;
}
try {
if( requestDelay > 0 ) {
Thread.sleep(requestDelay);
}
echoedBytesCounter.add(packet.remaining());
asyncChannel.write(packet);
asyncChannel.flush();
} catch (IOException e) {
onPacketError(e);
} catch (InterruptedException e) {
System.out.println("Interrupted... Shutting down.");
dispose();
}
}
public void onPacketError(IOException error) {
error.printStackTrace();
dispose();
}
private void dispose() {
if( !disposed ) {
asyncChannel.dispose();
activeConnectionsCounter.decrement();
disposed=true;
}
}
}
public void onAccept(Channel channel) {
try {
AsyncChannel asyncChannel = SyncToAsyncChannel.adapt(channel);
asyncChannel.setAsyncChannelListener(new ServerConnectionHandler(asyncChannel));
asyncChannel.start();
} catch (IOException e) {
onAcceptError(e);
}
}
public void onAcceptError(IOException error) {
error.printStackTrace();
shutdownCountDownLatch.countDown();
}
/**
* @return Returns the requestDelay.
*/
public long getRequestDelay() {
return requestDelay;
}
/**
* @param requestDelay The requestDelay to set.
*/
public void setRequestDelay(long requestDelay) {
this.requestDelay = requestDelay;
}
/**
* @return Returns the sampleInterval.
*/
public long getSampleInterval() {
return sampleInterval;
}
/**
* @param sampleInterval The sampleInterval to set.
*/
public void setSampleInterval(long sampleInterval) {
this.sampleInterval = sampleInterval;
}
}