TAJO-2014: TestRpcClientManager fails occasionally.
diff --git a/CHANGES b/CHANGES
index e3df45e..f7aba71 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,8 @@
BUG FIXES
+ TAJO-2014: TestRpcClientManager fails occasionally. (jinho)
+
TAJO-2000: BSTIndex can cause OOM. (jinho)
TAJO-1992: \set timezone in cli doesn't work because of casesensitive (DaeMyung)
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
index 08d877f..5d1ad26 100644
--- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -26,6 +26,7 @@
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -140,22 +141,22 @@
for (RpcEventListener listener: listeners) {
listener.onBeforeShutdown(this);
}
-
+
try {
accepted.close();
- if(bootstrap != null) {
+ if (bootstrap != null) {
if (bootstrap.childGroup() != null) {
- bootstrap.childGroup().shutdownGracefully();
+ Future future = bootstrap.childGroup().shutdownGracefully();
if (waitUntilThreadsStop) {
- bootstrap.childGroup().terminationFuture().sync();
+ future.sync();
}
}
if (bootstrap.group() != null) {
- bootstrap.group().shutdownGracefully();
+ Future future = bootstrap.group().shutdownGracefully();
if (waitUntilThreadsStop) {
- bootstrap.childGroup().terminationFuture().sync();
+ future.sync();
}
}
}
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
index 032cf35..67c5936 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -18,10 +18,10 @@
package org.apache.tajo.rpc;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.CommonsLoggerFactory;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.apache.commons.logging.Log;
@@ -106,7 +106,7 @@
public void channelRegistered(ChannelHandlerContext ctx) {
/* Register client to managed map */
clients.put(target.getKey(), target);
- target.getChannel().closeFuture().addListener(new ClientCloseFutureListener(target.getKey()));
+ ctx.channel().closeFuture().addListener(new ClientCloseFutureListener(target.getKey()));
}
@Override
@@ -210,7 +210,7 @@
}
}
- static class ClientCloseFutureListener implements GenericFutureListener {
+ static class ClientCloseFutureListener implements ChannelFutureListener {
private RpcConnectionKey key;
public ClientCloseFutureListener(RpcConnectionKey key) {
@@ -218,7 +218,7 @@
}
@Override
- public void operationComplete(Future future) throws Exception {
+ public void operationComplete(ChannelFuture future) throws Exception {
clients.remove(key);
}
}
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
index 865e5dd..2b50c1f 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
@@ -20,6 +20,8 @@
import org.apache.tajo.rpc.test.DummyProtocol;
import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.net.InetSocketAddress;
@@ -34,15 +36,27 @@
public class TestRpcClientManager {
+ static NettyServerBase server;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
+ server = new AsyncRpcServer(DummyProtocol.class,
+ service, new InetSocketAddress("127.0.0.1", 0), 10);
+ server.start();
+
+ }
+
+ @AfterClass
+ public static void afterClass(){
+ server.shutdown(true);
+ }
+
@Test
public void testRaceCondition() throws Exception {
final int parallelCount = 50;
- final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
- NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
- service, new InetSocketAddress("127.0.0.1", 0), parallelCount);
- server.start();
try {
final InetSocketAddress address = server.getListenAddress();
final RpcClientManager manager = RpcClientManager.getInstance();
@@ -54,7 +68,7 @@
public void run() {
NettyClientBase client = null;
try {
- client = manager.getClient(address, DummyProtocol.class, false, new Properties());
+ client = manager.getClient(address, DummyProtocol.class, true, new Properties());
} catch (Throwable e) {
fail(e.getMessage());
}
@@ -68,10 +82,9 @@
future.get();
}
- NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, false, new Properties());
+ NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, true, new Properties());
RpcClientManager.cleanup(clientBase);
} finally {
- server.shutdown(true);
executor.shutdown();
RpcClientManager.close();
}
@@ -79,10 +92,6 @@
@Test
public void testClientCloseEvent() throws Exception {
- final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
- NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
- service, new InetSocketAddress("127.0.0.1", 0), 3);
- server.start();
RpcClientManager manager = RpcClientManager.getInstance();
try {
@@ -97,18 +106,13 @@
client.close();
assertFalse(RpcClientManager.contains(key));
} finally {
- server.shutdown(true);
RpcClientManager.close();
}
}
@Test
public void testClientCloseEventWithReconnect() throws Exception {
- final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
- NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
- service, new InetSocketAddress("127.0.0.1", 0), 3);
- server.start();
- int repeat = 10;
+ int repeat = 100;
RpcClientManager manager = RpcClientManager.getInstance();
try {
@@ -133,46 +137,37 @@
assertFalse(RpcClientManager.contains(key));
}
} finally {
- server.shutdown(true);
RpcClientManager.close();
}
}
@Test
public void testUnManagedClient() throws Exception {
- final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
- NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
- service, new InetSocketAddress("127.0.0.1", 0), 3);
- server.start();
RpcConnectionKey key =
new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
RpcClientManager.close();
RpcClientManager manager = RpcClientManager.getInstance();
- try {
- NettyClientBase client1 = manager.newClient(key, new Properties());
- assertTrue(client1.isConnected());
- assertFalse(RpcClientManager.contains(key));
+ NettyClientBase client1 = manager.newClient(key, new Properties());
+ assertTrue(client1.isConnected());
+ assertFalse(RpcClientManager.contains(key));
- NettyClientBase client2 = manager.newClient(key, new Properties());
- assertTrue(client2.isConnected());
- assertFalse(RpcClientManager.contains(key));
+ NettyClientBase client2 = manager.newClient(key, new Properties());
+ assertTrue(client2.isConnected());
+ assertFalse(RpcClientManager.contains(key));
- assertEquals(client1.getRemoteAddress(), client2.getRemoteAddress());
- assertNotEquals(client1.getChannel(), client2.getChannel());
+ assertEquals(client1.getRemoteAddress(), client2.getRemoteAddress());
+ assertNotEquals(client1.getChannel(), client2.getChannel());
- client1.close();
- assertFalse(client1.isConnected());
- assertTrue(client2.isConnected());
+ client1.close();
+ assertFalse(client1.isConnected());
+ assertTrue(client2.isConnected());
- client1.connect();
- client2.close();
- assertFalse(client2.isConnected());
- assertTrue(client1.isConnected());
+ client1.connect();
+ client2.close();
+ assertFalse(client2.isConnected());
+ assertTrue(client1.isConnected());
- RpcClientManager.cleanup(client1, client2);
- } finally {
- server.shutdown(true);
- }
+ RpcClientManager.cleanup(client1, client2);
}
}
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
index 627e4ed..b11aa5a 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
@@ -101,17 +101,17 @@
assertEquals(4,i);
}
- @Test
+ @Test(timeout = 60000)
public void testGetSplit() throws Exception {
final Configuration conf = new HdfsConfiguration();
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
- cluster.waitClusterUp();
TajoConf tajoConf = new TajoConf(conf);
tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
@@ -158,16 +158,16 @@
}
}
- @Test
+ @Test(timeout = 60000)
public void testZeroLengthSplit() throws Exception {
final Configuration conf = new HdfsConfiguration();
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
- cluster.waitClusterUp();
TajoConf tajoConf = new TajoConf(conf);
tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
@@ -207,17 +207,17 @@
}
}
- @Test
+ @Test(timeout = 60000)
public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
final Configuration conf = new HdfsConfiguration();
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build();
- cluster.waitClusterUp();
TajoConf tajoConf = new TajoConf(conf);
tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
@@ -258,17 +258,17 @@
}
}
- @Test
+ @Test(timeout = 60000)
public void testGetFileTablespace() throws Exception {
final Configuration hdfsConf = new HdfsConfiguration();
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ hdfsConf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
final MiniDFSCluster cluster =
- new MiniDFSCluster.Builder(hdfsConf).numDataNodes(2).build();
- cluster.waitClusterUp();
+ new MiniDFSCluster.Builder(hdfsConf).numDataNodes(1).build();
URI uri = URI.create(cluster.getFileSystem().getUri() + "/tajo");
Optional<Tablespace> existingTs = Optional.absent();
@@ -287,13 +287,7 @@
space = (FileTablespace) TablespaceManager.getByName("testGetFileTablespace");
assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri());
-
} finally {
-
- if (existingTs.isPresent()) {
- TablespaceManager.addTableSpaceForTest(existingTs.get());
- }
-
cluster.shutdown(true);
}
}
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
index bae81a9..179c9eb 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
@@ -100,6 +100,7 @@
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf));
@@ -110,7 +111,6 @@
builder.waitSafeMode(true);
cluster = builder.build();
- cluster.waitClusterUp();
dfs = cluster.getFileSystem();
localFs = FileSystem.getLocal(new TajoConf());
}
@@ -168,7 +168,7 @@
return writeRowBlock(conf, meta, rowBlock, outputFile);
}
- @Test
+ @Test(timeout = 60000)
public void testRWForAllTypesWithNextTuple() throws IOException {
int rowNum = 10000;
@@ -198,7 +198,7 @@
assertEquals(rowNum, j);
}
- @Test
+ @Test(timeout = 60000)
public void testRepeatedScan() throws IOException {
int rowNum = 2;
@@ -226,7 +226,7 @@
reader.close();
}
- @Test
+ @Test(timeout = 60000)
public void testReset() throws IOException {
int rowNum = 2;