S4-7: Modified TCPEmitter and TCPListener - Fixed removeChannel() to make sure Channel.close() is not called on the same channel by two different threads - Not using OrderedMemoryAwareThreadPool anymore (doesn't guarantee ordering on partitions) - Using default thread pool to send messages; SendQueue is augmented to ensure a single thread processes one queue - Shortened the synchronized blocks as much as possible - Removed unnecessary handling code
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index 93176ab..98e7aa1 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -110,9 +110,15 @@
public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
logger.error("Error", event.getCause());
- if (context.getChannel().isOpen()) {
+ Channel c = context.getChannel();
+ if (c.isOpen()) {
logger.error("Closing channel due to exception");
- context.getChannel().close();
+ try {
+ if (c.close().await().isSuccess())
+ channels.remove(c);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
}
}
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
index d255137..db9b596 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
@@ -16,7 +16,7 @@
startThreads();
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 4; i++) {
Thread.sleep(500);
logger.debug("Messages sent so far - {}", util.sendThread.sendCounts);
((TCPEmitter) util.emitter).removeChannel(0);