blob: 142fa7063a63275a1d875a25e254383089fc5ae0 [file] [log] [blame]
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activeio.oneport;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import junit.framework.TestCase;
import org.apache.activeio.AcceptListener;
import org.apache.activeio.Channel;
import org.apache.activeio.adapter.AsyncToSyncChannel;
import org.apache.activeio.adapter.SyncToAsyncChannelFactory;
import org.apache.activeio.oneport.HttpRecognizer;
import org.apache.activeio.oneport.IIOPRecognizer;
import org.apache.activeio.oneport.OnePortAsyncChannelServer;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activeio.packet.async.AsyncChannel;
import org.apache.activeio.packet.async.AsyncChannelFactory;
import org.apache.activeio.packet.async.AsyncChannelServer;
import org.apache.activeio.packet.async.FilterAsyncChannelServer;
import org.apache.activeio.packet.sync.FilterSyncChannel;
import org.apache.activeio.packet.sync.SyncChannel;
import org.apache.activeio.packet.sync.socket.SocketSyncChannelFactory;
import org.apache.activeio.stream.sync.socket.SocketMetadata;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class OnePortAsyncChannelServerTest extends TestCase {
static private Log log = LogFactory.getLog(OnePortAsyncChannelServerTest.class);
static public AtomicInteger serverPacketCounter = new AtomicInteger(0);
public OnePortAsyncChannelServer server;
public AsyncChannelServer httpServer;
public AsyncChannelServer iiopServer;
public SocketSyncChannelFactory channelFactory;
public BlockingQueue resultSlot = new ArrayBlockingQueue(1);
public void testIIOPAccept() throws Exception {
serverPacketCounter.set(0);
hitIIOPServer();
String type = (String) resultSlot.poll(10, TimeUnit.SECONDS);
assertEquals("IIOP", type);
// Verify that a request when through the one port.
assertTrue(serverPacketCounter.get()>0);
}
public void testHttpAccept() throws IOException, URISyntaxException, InterruptedException {
serverPacketCounter.set(0);
hitHttpServer();
String type = (String) resultSlot.poll(60, TimeUnit.SECONDS);
assertEquals("HTTP", type);
// Verify that a request when through the one port.
assertTrue(serverPacketCounter.get()>0);
}
protected void hitHttpServer() throws IOException, MalformedURLException {
URI connectURI = server.getConnectURI();
String url = "http://" + connectURI.getHost() + ":" + connectURI.getPort() + "/index.action";
log.info(url);
InputStream is = new URL(url).openStream();
StringBuffer b = new StringBuffer();
int c;
while ((c = is.read()) >= 0) {
b.append((char) c);
}
log.info("HTTP response: " + b);
}
protected void hitIIOPServer() throws Exception {
SyncChannel channel = channelFactory.openSyncChannel(server.getConnectURI());
((SocketMetadata)channel.getAdapter(SocketMetadata.class)).setTcpNoDelay(true);
channel.write(new ByteArrayPacket("GIOPcrapcrap".getBytes("UTF-8")));
channel.flush();
channel.stop();
}
public void testUnknownAccept() throws IOException, URISyntaxException, InterruptedException {
SyncChannel channel = channelFactory.openSyncChannel(server.getConnectURI());
((SocketMetadata)channel.getAdapter(SocketMetadata.class)).setTcpNoDelay(true);
channel
.write(new ByteArrayPacket("Licensed under the Apache License, Version 2.0 (the \"License\")"
.getBytes("UTF-8")));
channel.flush();
String type = (String) resultSlot.poll(1000 * 5, TimeUnit.MILLISECONDS);
assertNull(type);
channel.dispose();
}
protected void setUp() throws Exception {
channelFactory = new SocketSyncChannelFactory();
ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory(){
int count=0;
public Thread newThread(Runnable arg0) {
return new Thread(arg0, "activeio:"+(count++));
}});
AsyncChannelFactory factory = SyncToAsyncChannelFactory.adapt(channelFactory,executor);
AsyncChannelServer cs = factory.bindAsyncChannel(new URI("tcp://localhost:0"));
cs = new FilterAsyncChannelServer(cs) {
public void onAccept(Channel channel) {
SyncChannel syncChannel = AsyncToSyncChannel.adapt(channel);
super.onAccept(new FilterSyncChannel(syncChannel) {
public org.apache.activeio.packet.Packet read(long timeout) throws IOException {
Packet packet = super.read(timeout);
if( packet!=null && packet.hasRemaining() )
serverPacketCounter.incrementAndGet();
return packet;
}
});
}
};
server = new OnePortAsyncChannelServer(cs);
server.start();
startHTTPServer();
startIIOPServer();
log.info("Running on: "+server.getConnectURI());
}
/**
* @throws IOException
* @throws NamingException
*/
protected void startIIOPServer() throws Exception {
iiopServer = server.bindAsyncChannel(IIOPRecognizer.IIOP_RECOGNIZER);
iiopServer.setAcceptListener(new AcceptListener() {
public void onAccept(Channel channel) {
try {
log.info("Got a IIOP connection.");
resultSlot.offer("IIOP", 1, TimeUnit.MILLISECONDS);
channel.dispose();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void onAcceptError(IOException error) {
}
});
iiopServer.start();
}
/**
* @throws IOException
* @throws Exception
*/
protected void startHTTPServer() throws Exception {
httpServer = server.bindAsyncChannel(HttpRecognizer.HTTP_RECOGNIZER);
httpServer.setAcceptListener(new AcceptListener() {
public void onAccept(Channel channel) {
try {
log.info("Got a HTTP connection.");
resultSlot.offer("HTTP", 1, TimeUnit.MILLISECONDS);
byte data[] = ("HTTP/1.1 200 OK\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "\r\n"
+ "Hello World").getBytes("UTF-8");
((SocketMetadata)channel.getAdapter(SocketMetadata.class)).setTcpNoDelay(true);
((AsyncChannel) channel).write(new ByteArrayPacket(data));
channel.dispose();
} catch (Throwable e) {
e.printStackTrace();
}
}
public void onAcceptError(IOException error) {
}
});
httpServer.start();
}
protected void tearDown() throws Exception {
stopIIOPServer();
stopHTTPServer();
server.dispose();
}
/**
* @throws InterruptedException
*
*/
protected void stopHTTPServer() throws InterruptedException {
httpServer.dispose();
}
/**
* @throws Exception
*
*/
protected void stopIIOPServer() throws Exception {
iiopServer.dispose();
}
}