blob: 1cab817a3179e7e1cf994834862c6fca5ce15a42 [file] [log] [blame]
package com.gemstone.gemfire.distributed.internal.tcpserver;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import junit.framework.TestCase;
import com.gemstone.gemfire.DataSerializable;
import com.gemstone.gemfire.cache.GemFireCache;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.PoolStatHelper;
import com.gemstone.gemfire.distributed.internal.SharedConfiguration;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpHandler;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpServer;
import com.gemstone.gemfire.internal.AvailablePort;
//import com.gemstone.org.jgroups.stack.GossipClient;
//import com.gemstone.org.jgroups.stack.IpAddress;
public class TcpServerJUnitDisabledTest extends TestCase {
protected/*GemStoneAddition*/ InetAddress localhost;
protected/*GemStoneAddition*/ int port;
private SimpleStats stats;
private TcpServer server;
public void start(TcpHandler handler) throws IOException {
localhost = InetAddress.getLocalHost();
port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
stats = new SimpleStats();
server = new TcpServer(port, localhost , new Properties(), null, handler, stats, Thread.currentThread().getThreadGroup(), "server thread");
server.start();
}
public void test() throws UnknownHostException, IOException, ClassNotFoundException, InterruptedException {
EchoHandler handler = new EchoHandler();
start(handler);
TestObject test = new TestObject();
test.id = 5;
TestObject result = (TestObject) TcpClient.requestToServer(localhost, port, test, 60 * 1000 );
Assert.assertEquals(test.id, result.id);
String[] info = TcpClient.getInfo(localhost, port);
Assert.assertNotNull(info);
Assert.assertTrue(info.length > 1);
try {
TcpClient.stop(localhost, port);
} catch ( ConnectException ignore ) {
// must not be running
}
server.join(60 * 1000);
Assert.assertFalse(server.isAlive());
Assert.assertTrue(handler.shutdown);
Assert.assertEquals(3, stats.started.get());
Assert.assertEquals(3, stats.ended.get());
}
public void testConcurrency() throws UnknownHostException, IOException, ClassNotFoundException, InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
DelayHandler handler = new DelayHandler(latch);
start(handler);
final AtomicBoolean done = new AtomicBoolean();
Thread delayedThread = new Thread() {
public void run() {
Boolean delay = Boolean.valueOf(true);
try {
TcpClient.requestToServer(localhost, port, delay, 60 * 1000 );
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
done.set(true);
}
};
delayedThread.start();
try {
Thread.sleep(500);
Assert.assertFalse(done.get());
TcpClient.requestToServer(localhost, port, Boolean.valueOf(false), 60 * 1000 );
Assert.assertFalse(done.get());
latch.countDown();
Thread.sleep(500);
Assert.assertTrue(done.get());
} finally {
latch.countDown();
delayedThread.join(60 * 1000);
Assert.assertTrue(!delayedThread.isAlive()); // GemStoneAddition
try {
TcpClient.stop(localhost, port);
} catch ( ConnectException ignore ) {
// must not be running
}
server.join(60 * 1000);
}
}
public static class TestObject implements DataSerializable {
int id;
public TestObject() {
}
public void fromData(DataInput in) throws IOException {
id = in.readInt();
}
public void toData(DataOutput out) throws IOException {
out.writeInt(id);
}
}
protected/*GemStoneAddition*/ static class EchoHandler implements TcpHandler {
protected/*GemStoneAddition*/ boolean shutdown;
public void init(TcpServer tcpServer) {
// TODO Auto-generated method stub
}
public Object processRequest(Object request) throws IOException {
return request;
}
public void shutDown() {
shutdown = true;
}
public void restarting(DistributedSystem ds, GemFireCache cache, SharedConfiguration sharedConfig) { }
public void endRequest(Object request,long startTime) { }
public void endResponse(Object request,long startTime) { }
}
private static class DelayHandler implements TcpHandler {
private CountDownLatch latch;
public DelayHandler(CountDownLatch latch) {
this.latch = latch;
}
public void init(TcpServer tcpServer) {
}
public Object processRequest(Object request) throws IOException {
Boolean delay = (Boolean) request;
if(delay.booleanValue()) {
try {
latch.await(120 * 1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return delay;
}
else {
return delay;
}
}
public void shutDown() {
}
public void restarting(DistributedSystem ds, GemFireCache cache, SharedConfiguration sharedConfig) { }
public void endRequest(Object request,long startTime) { }
public void endResponse(Object request,long startTime) { }
}
protected/*GemStoneAddition*/ static class SimpleStats implements PoolStatHelper {
AtomicInteger started = new AtomicInteger();
AtomicInteger ended = new AtomicInteger();
public void endJob() {
started.incrementAndGet();
}
public void startJob() {
ended.incrementAndGet();
}
}
}