Updates and fixes
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperator.java
index d02a8be..b2e405b 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperator.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package com.datatorrent.contrib.rabbitmq;
 
 /**
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java
index caee2eb..060c119 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package com.datatorrent.contrib.rabbitmq;
 
 import java.io.IOException;
@@ -26,8 +41,6 @@
     try {
       channel.basicPublish(exchange, "", null, tuple);
     } catch (IOException e) {
-
-      logger.debug(e.toString());
       DTThrowable.rethrow(e);
     }   
   }
diff --git a/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java
index 1338bbd..56ecbf1 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package com.datatorrent.contrib.zmq;
 
 /**
diff --git a/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperator.java
index 599be3e..0348942 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperator.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package com.datatorrent.contrib.zmq;
 
 /**
diff --git a/contrib/src/test/java/com/datatorrent/contrib/helper/CollectorModule.java b/contrib/src/test/java/com/datatorrent/contrib/helper/CollectorModule.java
new file mode 100644
index 0000000..1423ba6
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/helper/CollectorModule.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.helper;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import com.datatorrent.api.BaseOperator;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
+
+public class CollectorModule<T> extends BaseOperator
+{
+  public final transient CollectorInputPort<T> inputPort = new CollectorInputPort<T>("collector", this);
+
+  public static class CollectorInputPort<T> extends DefaultInputPort<T>
+  {
+    public volatile static HashMap<String, List<?>> collections = new HashMap<String, List<?>>();
+    ArrayList<T> list;
+
+    final String id;
+
+    public CollectorInputPort(String id, Operator module)
+    {
+      super();
+      this.id = id;
+    }
+
+    @Override
+    public void process(T tuple)
+    {
+  //    System.out.print("collector process:"+tuple);
+      list.add(tuple);
+    }
+
+    @Override
+    public void setConnected(boolean flag)
+    {
+      if (flag) {
+        collections.put(id, list = new ArrayList<T>());
+      }
+    }
+  }
+}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/testhelper/MessageQueueTestHelper.java b/contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java
similarity index 70%
rename from contrib/src/test/java/com/datatorrent/contrib/testhelper/MessageQueueTestHelper.java
rename to contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java
index 6d71bf0..11d01dd 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/testhelper/MessageQueueTestHelper.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java
@@ -1,4 +1,19 @@
-package com.datatorrent.contrib.testhelper;
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.helper;
 
 import java.util.ArrayList;
 import java.util.HashMap;
diff --git a/contrib/src/test/java/com/datatorrent/contrib/helper/SourceModule.java b/contrib/src/test/java/com/datatorrent/contrib/helper/SourceModule.java
new file mode 100644
index 0000000..c3ff189
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/helper/SourceModule.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.helper;
+
+import java.util.HashMap;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.BaseOperator;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator.ActivationListener;
+
+public class SourceModule extends BaseOperator
+implements InputOperator, ActivationListener<OperatorContext>
+{
+  public final transient DefaultOutputPort<byte[]> outPort = new DefaultOutputPort<byte[]>();
+  transient ArrayBlockingQueue<byte[]> holdingBuffer;
+  int testNum;
+
+  private static org.slf4j.Logger logger;
+
+  public SourceModule()
+  {
+    logger =  LoggerFactory.getLogger(SourceModule.class);;
+  }
+
+  public SourceModule(org.slf4j.Logger loggerInstance)
+  {
+    logger = loggerInstance;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    holdingBuffer = new ArrayBlockingQueue<byte[]>(1024 * 1024);
+  }
+
+  public void emitTuple(byte[] message)
+  {
+    logger.debug("Emitting message {}", message);
+    outPort.emit(message);
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    for (int i = holdingBuffer.size(); i-- > 0;) {
+      emitTuple(holdingBuffer.poll());
+    }
+  }
+
+  @Override
+  public void activate(OperatorContext ctx)
+  {
+    for (int i = 0; i < testNum; i++) {
+      HashMap<String, Integer> dataMapa = new HashMap<String, Integer>();
+      dataMapa.put("a", 2);
+      holdingBuffer.add(dataMapa.toString().getBytes());
+
+      HashMap<String, Integer> dataMapb = new HashMap<String, Integer>();
+      dataMapb.put("b", 20);
+      holdingBuffer.add(dataMapb.toString().getBytes());
+
+      HashMap<String, Integer> dataMapc = new HashMap<String, Integer>();
+      dataMapc.put("c", 1000);
+      holdingBuffer.add(dataMapc.toString().getBytes());
+    }
+  }
+
+  public void setTestNum(int testNum)
+  {
+    this.testNum = testNum;
+  }
+
+  @Override
+  public void deactivate()
+  {
+  }
+
+  public void replayTuples(long windowId)
+  {
+  }
+}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorBenchmark.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorBenchmark.java
index dd3419c..5e17a01 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorBenchmark.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorBenchmark.java
@@ -15,10 +15,8 @@
  */
 package com.datatorrent.contrib.rabbitmq;
 
-import java.util.HashMap;
-import java.util.List;
-
 import org.junit.Test;
+import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
@@ -26,15 +24,12 @@
  */
 public class RabbitMQInputOperatorBenchmark extends RabbitMQInputOperatorTest
 {
-  static HashMap<String, List<?>> collections = new HashMap<String, List<?>>();
+  private static final Logger logger = LoggerFactory.getLogger(RabbitMQInputOperatorBenchmark.class);
 
-  
   @Test
   public void testDag() throws Exception
   {
-    logger = LoggerFactory.getLogger(RabbitMQInputOperatorTest.class);
-    final int testNum = 100000;
-    runTest(testNum);
+    runTest(100000);
     logger.debug("end of test");
   }
 }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
index 8c37b1f..6b9fe23 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
@@ -20,25 +20,30 @@
 import java.util.HashMap;
 import java.util.List;
 
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.testhelper.CollectorModule;
-import com.datatorrent.contrib.testhelper.MessageQueueTestHelper;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.QueueingConsumer;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.contrib.helper.CollectorModule;
+import com.datatorrent.contrib.helper.MessageQueueTestHelper;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.LocalMode;
+
+import com.datatorrent.common.util.DTThrowable;
+
 /**
  *
  */
 public class RabbitMQInputOperatorTest
 {
-  protected static Logger logger = LoggerFactory.getLogger(RabbitMQInputOperatorTest.class);
+  private static Logger logger = LoggerFactory.getLogger(RabbitMQInputOperatorTest.class);
   
   public static final class TestStringRabbitMQInputOperator extends AbstractSinglePortRabbitMQInputOperator<String>
   {
@@ -135,7 +140,6 @@
 
     new Thread("LocalClusterController")
     {
-      int cnt = 0;
       @Override
       public void run()
       {
@@ -146,6 +150,7 @@
             Thread.sleep(500);
           }
           publisher.generateMessages(testNum);
+          startTms = System.currentTimeMillis();
           while (System.currentTimeMillis() - startTms < timeout) {
             List<?> list = collector.inputPort.collections.get("collector");
             
@@ -158,18 +163,20 @@
           }
         }
         catch (IOException ex) {
-          logger.debug(ex.toString());
+          logger.error(ex.getMessage(), ex);
+          DTThrowable.rethrow(ex);
+        } catch (InterruptedException ex) {
+          DTThrowable.rethrow(ex);
+        } finally {
+          lc.shutdown();
         }
-        catch (InterruptedException ex) {
-        }
-        lc.shutdown();
       }
 
     }.start();
 
     lc.run();
 
-    logger.debug("collection size:" + collector.inputPort.collections.size() + " " +  collector.inputPort.collections.toString());
+    logger.debug("collection size: {} {}", collector.inputPort.collections.size(), collector.inputPort.collections);
 
     MessageQueueTestHelper.validateResults(testNum, collector.inputPort.collections);
   }  
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java
index 21740d0..d9d9217 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java
@@ -15,9 +15,6 @@
  */
 package com.datatorrent.contrib.rabbitmq;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
@@ -26,13 +23,12 @@
  */
 public class RabbitMQOutputOperatorBenchmark extends RabbitMQOutputOperatorTest
 {
+  private static org.slf4j.Logger logger = LoggerFactory.getLogger(RabbitMQOutputOperatorBenchmark.class);
+
   @Test
-  public void testDag() throws InterruptedException, MalformedURLException, IOException, Exception
+  public void testDag() throws Exception
   {
-    logger = LoggerFactory.getLogger(RabbitMQOutputOperatorTest.class);
-    final int testNum = 100000;
-    runTest(testNum);
-    
+    runTest(100000);
     logger.debug("end of test");  
   }
 }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
index cda1f2c..342245d 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
@@ -20,27 +20,24 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import com.rabbitmq.client.*;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.contrib.helper.SourceModule;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.testhelper.SourceModule;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.DefaultConsumer;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.ShutdownSignalException;
+
 /**
  *
  */
 public class RabbitMQOutputOperatorTest
 {
-  protected static org.slf4j.Logger logger = LoggerFactory.getLogger(RabbitMQOutputOperatorTest.class);
+  private static org.slf4j.Logger logger = LoggerFactory.getLogger(RabbitMQOutputOperatorTest.class);
 
   public class RabbitMQMessageReceiver
   {
@@ -156,14 +153,18 @@
     lc.runAsync();
     try {      
       Thread.sleep(1000);
-      while(receiver.count < testNum * 3)
+      long timeout = 10000L;
+      long startTms = System.currentTimeMillis();
+      while((receiver.count < testNum * 3) && (System.currentTimeMillis() - startTms < timeout))
       {
         Thread.sleep(100);
       } 
     }
     catch (InterruptedException ex) {
+      Assert.fail(ex.getMessage());
+    } finally {
+      lc.shutdown();
     }
-    lc.shutdown();
 
     Assert.assertEquals("emitted value for testNum was ", testNum * 3, receiver.count);
     for (Map.Entry<String, Integer> e : receiver.dataMap.entrySet()) {
diff --git a/contrib/src/test/java/com/datatorrent/contrib/testhelper/CollectorInputPort.java b/contrib/src/test/java/com/datatorrent/contrib/testhelper/CollectorInputPort.java
deleted file mode 100644
index 8f828c7..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/testhelper/CollectorInputPort.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.datatorrent.contrib.testhelper;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator;
-
-public class CollectorInputPort<T> extends DefaultInputPort<T>
-{
-  public volatile static HashMap<String, List<?>> collections = new HashMap<String, List<?>>();
-  ArrayList<T> list;
-
-  final String id;
-
-  public CollectorInputPort(String id, Operator module)
-  {
-    super();
-    this.id = id;
-  }
-
-  @Override
-  public void process(T tuple)
-  {
-//    System.out.print("collector process:"+tuple);
-    list.add(tuple);
-  }
-
-  @Override
-  public void setConnected(boolean flag)
-  {
-    if (flag) {
-      collections.put(id, list = new ArrayList<T>());
-    }
-  }
-}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/testhelper/CollectorModule.java b/contrib/src/test/java/com/datatorrent/contrib/testhelper/CollectorModule.java
deleted file mode 100644
index b9c1c33..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/testhelper/CollectorModule.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package com.datatorrent.contrib.testhelper;
-
-import com.datatorrent.api.BaseOperator;
-
-
-public class CollectorModule<T> extends BaseOperator
-{
-  public final transient CollectorInputPort<T> inputPort = new CollectorInputPort<T>("collector", this);
-}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/testhelper/SourceModule.java b/contrib/src/test/java/com/datatorrent/contrib/testhelper/SourceModule.java
deleted file mode 100644
index 44f7026..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/testhelper/SourceModule.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package com.datatorrent.contrib.testhelper;
-
-import java.util.HashMap;
-import java.util.concurrent.ArrayBlockingQueue;
-
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.BaseOperator;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Operator.ActivationListener;
-
-public class SourceModule extends BaseOperator
-implements InputOperator, ActivationListener<OperatorContext>
-{
-  public final transient DefaultOutputPort<byte[]> outPort = new DefaultOutputPort<byte[]>();
-  transient ArrayBlockingQueue<byte[]> holdingBuffer;
-  int testNum;
-
-  private static org.slf4j.Logger logger;
-
-  public SourceModule()
-  {
-    logger =  LoggerFactory.getLogger(SourceModule.class);;
-  }
-
-  public SourceModule(org.slf4j.Logger loggerInstance)
-  {
-    logger = loggerInstance;
-  }
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    holdingBuffer = new ArrayBlockingQueue<byte[]>(1024 * 1024);
-  }
-
-  public void emitTuple(byte[] message)
-  {
-    logger.debug("Emmiting message " + message.toString());
-    outPort.emit(message);
-  }
-
-  @Override
-  public void emitTuples()
-  {
-    for (int i = holdingBuffer.size(); i-- > 0;) {
-      emitTuple(holdingBuffer.poll());
-    }
-  }
-
-  @Override
-  public void activate(OperatorContext ctx)
-  {
-    for (int i = 0; i < testNum; i++) {
-      HashMap<String, Integer> dataMapa = new HashMap<String, Integer>();
-      dataMapa.put("a", 2);
-      holdingBuffer.add(dataMapa.toString().getBytes());
-
-      HashMap<String, Integer> dataMapb = new HashMap<String, Integer>();
-      dataMapb.put("b", 20);
-      holdingBuffer.add(dataMapb.toString().getBytes());
-
-      HashMap<String, Integer> dataMapc = new HashMap<String, Integer>();
-      dataMapc.put("c", 1000);
-      holdingBuffer.add(dataMapc.toString().getBytes());
-    }
-  }
-
-  public void setTestNum(int testNum)
-  {
-    this.testNum = testNum;
-  }
-
-  @Override
-  public void deactivate()
-  {
-  }
-
-  public void replayTuples(long windowId)
-  {
-  }
-}
-
-
-class SourceModule1 extends BaseOperator
-implements InputOperator, ActivationListener<OperatorContext>
-{
-  public final transient DefaultOutputPort<byte[]> outPort = new DefaultOutputPort<byte[]>();
-  transient ArrayBlockingQueue<byte[]> holdingBuffer;
-  int testNum;
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    holdingBuffer = new ArrayBlockingQueue<byte[]>(1024 * 1024);
-  }
-
-  public void emitTuple(byte[] message)
-  {
-    outPort.emit(message);
-  }
-
-  @Override
-  public void emitTuples()
-  {
-    for (int i = holdingBuffer.size(); i-- > 0;) {
-      emitTuple(holdingBuffer.poll());
-    }
-  }
-
-  @Override
-  public void activate(OperatorContext ctx)
-  {
-    for (int i = 0; i < testNum; i++) {
-      HashMap<String, Integer> dataMapa = new HashMap<String, Integer>();
-      dataMapa.put("a", 2);
-      holdingBuffer.add(dataMapa.toString().getBytes());
-
-      HashMap<String, Integer> dataMapb = new HashMap<String, Integer>();
-      dataMapb.put("b", 20);
-      holdingBuffer.add(dataMapb.toString().getBytes());
-
-      HashMap<String, Integer> dataMapc = new HashMap<String, Integer>();
-      dataMapc.put("c", 1000);
-      holdingBuffer.add(dataMapc.toString().getBytes());
-    }
-  }
-
-  public void setTestNum(int testNum)
-  {
-    this.testNum = testNum;
-  }
-
-  @Override
-  public void deactivate()
-  {
-  }
-
-  public void replayTuples(long windowId)
-  {
-  }
-}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorBenchmark.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorBenchmark.java
index 011c9da..7ea1179 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorBenchmark.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorBenchmark.java
@@ -17,16 +17,13 @@
 
 import org.junit.Test;
 
-
-
 /**
  *
  */
 public class ZeroMQInputOperatorBenchmark extends ZeroMQInputOperatorTest
 {
   @Test
-  public void testDag() throws InterruptedException, Exception {
-    final int testNum = 2000000;
-    testHelper(testNum);
+  public void testDag() throws Exception {
+    testHelper(2000000);
   }
 }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java
index 915c2e9..28df249 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java
@@ -21,11 +21,14 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.contrib.helper.CollectorModule;
+import com.datatorrent.contrib.helper.MessageQueueTestHelper;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.testhelper.CollectorModule;
-import com.datatorrent.contrib.testhelper.MessageQueueTestHelper;
+
+import com.datatorrent.common.util.DTThrowable;
 
 /**
  *
@@ -45,7 +48,7 @@
     LocalMode lma = LocalMode.newInstance();
     DAG dag = lma.getDAG();
 
-    final ZeroMQMessageGenerator publisher = new ZeroMQMessageGenerator(logger);
+    final ZeroMQMessageGenerator publisher = new ZeroMQMessageGenerator();
     publisher.setup();
 
     ZeroMQInputOperator generator = dag.addOperator("Generator", ZeroMQInputOperator.class);
@@ -83,33 +86,29 @@
             Thread.sleep(500);
           }
           Thread.sleep(1000);
-          while (true) {
+          startTms = System.currentTimeMillis();
+          while (System.currentTimeMillis() - startTms < timeout) {
             List<?> list = collector.inputPort.collections.get("collector");
-            
             if (list.size() < testNum * 3) {
               Thread.sleep(10);
-            }
-            
-            else {
+            } else {
               break;
             }			            
           }
         }
         catch (InterruptedException ex) {
+          DTThrowable.rethrow(ex);
+        } finally {
+          logger.debug("Shutting down..");
+          lc.shutdown();
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            DTThrowable.rethrow(e);
+          } finally {
+            publisher.teardown();
+          }
         }
-
-        logger.debug("Shutting down..");
-        lc.shutdown();
-
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
-        }
-
-        publisher.teardown();
-
       }
     }.start();
 
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java
index 81a0e75..dc0591d 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java
@@ -1,14 +1,33 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package com.datatorrent.contrib.zmq;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.zeromq.ZMQ;
 
-import com.datatorrent.contrib.testhelper.MessageQueueTestHelper;
+import com.datatorrent.contrib.helper.MessageQueueTestHelper;
 
 class ZeroMQMessageGenerator {
+
+  private static final Logger logger = LoggerFactory.getLogger(ZeroMQMessageGenerator.class);
+
   private ZMQ.Context context;
   private ZMQ.Socket publisher;
   private ZMQ.Socket syncservice;
@@ -17,12 +36,6 @@
   String pubAddr = "tcp://*:5556";
   String syncAddr = "tcp://*:5557";
 
-  private static Logger logger;
-  public ZeroMQMessageGenerator(Logger loggerInstance)
-  {
-    logger = loggerInstance;
-  }
-
   public void setup()
   {
     context = ZMQ.context(1);
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java
index 06662c9..c38fe00 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java
@@ -1,26 +1,38 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package com.datatorrent.contrib.zmq;
 
 import java.util.HashMap;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.zeromq.ZMQ;
 
 
 final class ZeroMQMessageReceiver implements Runnable
 {
+  private static final Logger logger = LoggerFactory.getLogger(ZeroMQMessageReceiver.class);
+
   public HashMap<String, Integer> dataMap = new HashMap<String, Integer>();
   public int count = 0;
   protected ZMQ.Context context;
   protected ZMQ.Socket subscriber;
   protected ZMQ.Socket syncclient;
   volatile boolean shutDown = false;
-  
-  private static org.slf4j.Logger logger;
-  
-  public ZeroMQMessageReceiver(org.slf4j.Logger loggerInstance)
-  {
-	logger =  loggerInstance;
-  }
-  
+
   public void setup()
   {
     context = ZMQ.context(1);
@@ -60,7 +72,7 @@
       int eq = str.indexOf('=');
       String key = str.substring(1, eq);
       int value = Integer.parseInt(str.substring(eq + 1, str.length() - 1));
-      logger.debug("\nsubscriber recv:" + str);
+      logger.debug("\nsubscriber recv: {}", str);
       dataMap.put(key, value);
       count++;
       logger.debug("out of loop.. ");
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorBenchmark.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorBenchmark.java
index 15bc2ca..1c66f11 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorBenchmark.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorBenchmark.java
@@ -16,6 +16,7 @@
 package com.datatorrent.contrib.zmq;
 
 import org.junit.Test;
+import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
@@ -23,15 +24,13 @@
  */
 public class ZeroMQOutputOperatorBenchmark extends ZeroMQOutputOperatorTest
 {
+  private static final Logger logger = LoggerFactory.getLogger(ZeroMQOutputOperatorBenchmark.class);
+
   @Test
   public void testDag() throws Exception
   {
-    final int testNum = 2000000;
-
-    logger = LoggerFactory.getLogger(ZeroMQOutputOperatorTest.class);
-
+    int testNum = 2000000;
     runTest(testNum);
-    
     logger.debug(String.format("\nBenchmarked %d tuples", testNum * 3));
     logger.debug("end of test");
   }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java
index bdd7d85..a7bc6e0 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java
@@ -22,10 +22,13 @@
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.contrib.helper.SourceModule;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.testhelper.SourceModule;
+
+import com.datatorrent.common.util.DTThrowable;
 
 
 /**
@@ -60,7 +63,7 @@
     final LocalMode.Controller lc = lma.getController();
     lc.setHeartbeatMonitoringEnabled(false);
 
-    final ZeroMQMessageReceiver receiver = new ZeroMQMessageReceiver(logger);
+    final ZeroMQMessageReceiver receiver = new ZeroMQMessageReceiver();
     receiver.setup();
     final Thread t = new Thread(receiver);
     t.start();
@@ -71,29 +74,30 @@
       {
         try {
           Thread.sleep(1000);
-          while (true) {
+          long timeout = 10000L;
+          long startTms = System.currentTimeMillis();
+          while (System.currentTimeMillis() - startTms < timeout) {
             if (receiver.count < testNum * 3) {
               Thread.sleep(10);
-            }
-            else {
+            } else {
               break;
             }
           }
         }
         catch (InterruptedException ex) {
+          DTThrowable.rethrow(ex);
+        } finally {
+          logger.debug("done...");
+          lc.shutdown();
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            DTThrowable.rethrow(e);
+          } finally {
+            t.interrupt();
+            receiver.teardown();
+          }
         }
-        logger.debug("done...");
-        lc.shutdown();
-        try {
-			Thread.sleep(1000);
-		} catch (InterruptedException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
-        t.interrupt();
-        receiver.teardown();
-        
-                        
       }
     }.start();