Updates and fixes
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperator.java
index d02a8be..b2e405b 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperator.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.datatorrent.contrib.rabbitmq;
/**
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java
index caee2eb..060c119 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.datatorrent.contrib.rabbitmq;
import java.io.IOException;
@@ -26,8 +41,6 @@
try {
channel.basicPublish(exchange, "", null, tuple);
} catch (IOException e) {
-
- logger.debug(e.toString());
DTThrowable.rethrow(e);
}
}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java
index 1338bbd..56ecbf1 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.datatorrent.contrib.zmq;
/**
diff --git a/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperator.java
index 599be3e..0348942 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperator.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.datatorrent.contrib.zmq;
/**
diff --git a/contrib/src/test/java/com/datatorrent/contrib/helper/CollectorModule.java b/contrib/src/test/java/com/datatorrent/contrib/helper/CollectorModule.java
new file mode 100644
index 0000000..1423ba6
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/helper/CollectorModule.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.helper;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import com.datatorrent.api.BaseOperator;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
+
+public class CollectorModule<T> extends BaseOperator
+{
+ public final transient CollectorInputPort<T> inputPort = new CollectorInputPort<T>("collector", this);
+
+ public static class CollectorInputPort<T> extends DefaultInputPort<T>
+ {
+ public volatile static HashMap<String, List<?>> collections = new HashMap<String, List<?>>();
+ ArrayList<T> list;
+
+ final String id;
+
+ public CollectorInputPort(String id, Operator module)
+ {
+ super();
+ this.id = id;
+ }
+
+ @Override
+ public void process(T tuple)
+ {
+ // System.out.print("collector process:"+tuple);
+ list.add(tuple);
+ }
+
+ @Override
+ public void setConnected(boolean flag)
+ {
+ if (flag) {
+ collections.put(id, list = new ArrayList<T>());
+ }
+ }
+ }
+}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/testhelper/MessageQueueTestHelper.java b/contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java
similarity index 70%
rename from contrib/src/test/java/com/datatorrent/contrib/testhelper/MessageQueueTestHelper.java
rename to contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java
index 6d71bf0..11d01dd 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/testhelper/MessageQueueTestHelper.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java
@@ -1,4 +1,19 @@
-package com.datatorrent.contrib.testhelper;
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.helper;
import java.util.ArrayList;
import java.util.HashMap;
diff --git a/contrib/src/test/java/com/datatorrent/contrib/helper/SourceModule.java b/contrib/src/test/java/com/datatorrent/contrib/helper/SourceModule.java
new file mode 100644
index 0000000..c3ff189
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/helper/SourceModule.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.helper;
+
+import java.util.HashMap;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.BaseOperator;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator.ActivationListener;
+
+public class SourceModule extends BaseOperator
+implements InputOperator, ActivationListener<OperatorContext>
+{
+ public final transient DefaultOutputPort<byte[]> outPort = new DefaultOutputPort<byte[]>();
+ transient ArrayBlockingQueue<byte[]> holdingBuffer;
+ int testNum;
+
+ private static org.slf4j.Logger logger;
+
+ public SourceModule()
+ {
+ logger = LoggerFactory.getLogger(SourceModule.class);;
+ }
+
+ public SourceModule(org.slf4j.Logger loggerInstance)
+ {
+ logger = loggerInstance;
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ holdingBuffer = new ArrayBlockingQueue<byte[]>(1024 * 1024);
+ }
+
+ public void emitTuple(byte[] message)
+ {
+ logger.debug("Emitting message {}", message);
+ outPort.emit(message);
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ for (int i = holdingBuffer.size(); i-- > 0;) {
+ emitTuple(holdingBuffer.poll());
+ }
+ }
+
+ @Override
+ public void activate(OperatorContext ctx)
+ {
+ for (int i = 0; i < testNum; i++) {
+ HashMap<String, Integer> dataMapa = new HashMap<String, Integer>();
+ dataMapa.put("a", 2);
+ holdingBuffer.add(dataMapa.toString().getBytes());
+
+ HashMap<String, Integer> dataMapb = new HashMap<String, Integer>();
+ dataMapb.put("b", 20);
+ holdingBuffer.add(dataMapb.toString().getBytes());
+
+ HashMap<String, Integer> dataMapc = new HashMap<String, Integer>();
+ dataMapc.put("c", 1000);
+ holdingBuffer.add(dataMapc.toString().getBytes());
+ }
+ }
+
+ public void setTestNum(int testNum)
+ {
+ this.testNum = testNum;
+ }
+
+ @Override
+ public void deactivate()
+ {
+ }
+
+ public void replayTuples(long windowId)
+ {
+ }
+}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorBenchmark.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorBenchmark.java
index dd3419c..5e17a01 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorBenchmark.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorBenchmark.java
@@ -15,10 +15,8 @@
*/
package com.datatorrent.contrib.rabbitmq;
-import java.util.HashMap;
-import java.util.List;
-
import org.junit.Test;
+import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
@@ -26,15 +24,12 @@
*/
public class RabbitMQInputOperatorBenchmark extends RabbitMQInputOperatorTest
{
- static HashMap<String, List<?>> collections = new HashMap<String, List<?>>();
+ private static final Logger logger = LoggerFactory.getLogger(RabbitMQInputOperatorBenchmark.class);
-
@Test
public void testDag() throws Exception
{
- logger = LoggerFactory.getLogger(RabbitMQInputOperatorTest.class);
- final int testNum = 100000;
- runTest(testNum);
+ runTest(100000);
logger.debug("end of test");
}
}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
index 8c37b1f..6b9fe23 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
@@ -20,25 +20,30 @@
import java.util.HashMap;
import java.util.List;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.testhelper.CollectorModule;
-import com.datatorrent.contrib.testhelper.MessageQueueTestHelper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.contrib.helper.CollectorModule;
+import com.datatorrent.contrib.helper.MessageQueueTestHelper;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.LocalMode;
+
+import com.datatorrent.common.util.DTThrowable;
+
/**
*
*/
public class RabbitMQInputOperatorTest
{
- protected static Logger logger = LoggerFactory.getLogger(RabbitMQInputOperatorTest.class);
+ private static Logger logger = LoggerFactory.getLogger(RabbitMQInputOperatorTest.class);
public static final class TestStringRabbitMQInputOperator extends AbstractSinglePortRabbitMQInputOperator<String>
{
@@ -135,7 +140,6 @@
new Thread("LocalClusterController")
{
- int cnt = 0;
@Override
public void run()
{
@@ -146,6 +150,7 @@
Thread.sleep(500);
}
publisher.generateMessages(testNum);
+ startTms = System.currentTimeMillis();
while (System.currentTimeMillis() - startTms < timeout) {
List<?> list = collector.inputPort.collections.get("collector");
@@ -158,18 +163,20 @@
}
}
catch (IOException ex) {
- logger.debug(ex.toString());
+ logger.error(ex.getMessage(), ex);
+ DTThrowable.rethrow(ex);
+ } catch (InterruptedException ex) {
+ DTThrowable.rethrow(ex);
+ } finally {
+ lc.shutdown();
}
- catch (InterruptedException ex) {
- }
- lc.shutdown();
}
}.start();
lc.run();
- logger.debug("collection size:" + collector.inputPort.collections.size() + " " + collector.inputPort.collections.toString());
+ logger.debug("collection size: {} {}", collector.inputPort.collections.size(), collector.inputPort.collections);
MessageQueueTestHelper.validateResults(testNum, collector.inputPort.collections);
}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java
index 21740d0..d9d9217 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java
@@ -15,9 +15,6 @@
*/
package com.datatorrent.contrib.rabbitmq;
-import java.io.IOException;
-import java.net.MalformedURLException;
-
import org.junit.Test;
import org.slf4j.LoggerFactory;
@@ -26,13 +23,12 @@
*/
public class RabbitMQOutputOperatorBenchmark extends RabbitMQOutputOperatorTest
{
+ private static org.slf4j.Logger logger = LoggerFactory.getLogger(RabbitMQOutputOperatorBenchmark.class);
+
@Test
- public void testDag() throws InterruptedException, MalformedURLException, IOException, Exception
+ public void testDag() throws Exception
{
- logger = LoggerFactory.getLogger(RabbitMQOutputOperatorTest.class);
- final int testNum = 100000;
- runTest(testNum);
-
+ runTest(100000);
logger.debug("end of test");
}
}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
index cda1f2c..342245d 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
@@ -20,27 +20,24 @@
import java.util.HashMap;
import java.util.Map;
+import com.rabbitmq.client.*;
+
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.LoggerFactory;
+import com.datatorrent.contrib.helper.SourceModule;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.testhelper.SourceModule;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.DefaultConsumer;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.ShutdownSignalException;
+
/**
*
*/
public class RabbitMQOutputOperatorTest
{
- protected static org.slf4j.Logger logger = LoggerFactory.getLogger(RabbitMQOutputOperatorTest.class);
+ private static org.slf4j.Logger logger = LoggerFactory.getLogger(RabbitMQOutputOperatorTest.class);
public class RabbitMQMessageReceiver
{
@@ -156,14 +153,18 @@
lc.runAsync();
try {
Thread.sleep(1000);
- while(receiver.count < testNum * 3)
+ long timeout = 10000L;
+ long startTms = System.currentTimeMillis();
+ while((receiver.count < testNum * 3) && (System.currentTimeMillis() - startTms < timeout))
{
Thread.sleep(100);
}
}
catch (InterruptedException ex) {
+ Assert.fail(ex.getMessage());
+ } finally {
+ lc.shutdown();
}
- lc.shutdown();
Assert.assertEquals("emitted value for testNum was ", testNum * 3, receiver.count);
for (Map.Entry<String, Integer> e : receiver.dataMap.entrySet()) {
diff --git a/contrib/src/test/java/com/datatorrent/contrib/testhelper/CollectorInputPort.java b/contrib/src/test/java/com/datatorrent/contrib/testhelper/CollectorInputPort.java
deleted file mode 100644
index 8f828c7..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/testhelper/CollectorInputPort.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.datatorrent.contrib.testhelper;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator;
-
-public class CollectorInputPort<T> extends DefaultInputPort<T>
-{
- public volatile static HashMap<String, List<?>> collections = new HashMap<String, List<?>>();
- ArrayList<T> list;
-
- final String id;
-
- public CollectorInputPort(String id, Operator module)
- {
- super();
- this.id = id;
- }
-
- @Override
- public void process(T tuple)
- {
-// System.out.print("collector process:"+tuple);
- list.add(tuple);
- }
-
- @Override
- public void setConnected(boolean flag)
- {
- if (flag) {
- collections.put(id, list = new ArrayList<T>());
- }
- }
-}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/testhelper/CollectorModule.java b/contrib/src/test/java/com/datatorrent/contrib/testhelper/CollectorModule.java
deleted file mode 100644
index b9c1c33..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/testhelper/CollectorModule.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package com.datatorrent.contrib.testhelper;
-
-import com.datatorrent.api.BaseOperator;
-
-
-public class CollectorModule<T> extends BaseOperator
-{
- public final transient CollectorInputPort<T> inputPort = new CollectorInputPort<T>("collector", this);
-}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/testhelper/SourceModule.java b/contrib/src/test/java/com/datatorrent/contrib/testhelper/SourceModule.java
deleted file mode 100644
index 44f7026..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/testhelper/SourceModule.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package com.datatorrent.contrib.testhelper;
-
-import java.util.HashMap;
-import java.util.concurrent.ArrayBlockingQueue;
-
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.BaseOperator;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Operator.ActivationListener;
-
-public class SourceModule extends BaseOperator
-implements InputOperator, ActivationListener<OperatorContext>
-{
- public final transient DefaultOutputPort<byte[]> outPort = new DefaultOutputPort<byte[]>();
- transient ArrayBlockingQueue<byte[]> holdingBuffer;
- int testNum;
-
- private static org.slf4j.Logger logger;
-
- public SourceModule()
- {
- logger = LoggerFactory.getLogger(SourceModule.class);;
- }
-
- public SourceModule(org.slf4j.Logger loggerInstance)
- {
- logger = loggerInstance;
- }
-
- @Override
- public void setup(OperatorContext context)
- {
- holdingBuffer = new ArrayBlockingQueue<byte[]>(1024 * 1024);
- }
-
- public void emitTuple(byte[] message)
- {
- logger.debug("Emmiting message " + message.toString());
- outPort.emit(message);
- }
-
- @Override
- public void emitTuples()
- {
- for (int i = holdingBuffer.size(); i-- > 0;) {
- emitTuple(holdingBuffer.poll());
- }
- }
-
- @Override
- public void activate(OperatorContext ctx)
- {
- for (int i = 0; i < testNum; i++) {
- HashMap<String, Integer> dataMapa = new HashMap<String, Integer>();
- dataMapa.put("a", 2);
- holdingBuffer.add(dataMapa.toString().getBytes());
-
- HashMap<String, Integer> dataMapb = new HashMap<String, Integer>();
- dataMapb.put("b", 20);
- holdingBuffer.add(dataMapb.toString().getBytes());
-
- HashMap<String, Integer> dataMapc = new HashMap<String, Integer>();
- dataMapc.put("c", 1000);
- holdingBuffer.add(dataMapc.toString().getBytes());
- }
- }
-
- public void setTestNum(int testNum)
- {
- this.testNum = testNum;
- }
-
- @Override
- public void deactivate()
- {
- }
-
- public void replayTuples(long windowId)
- {
- }
-}
-
-
-class SourceModule1 extends BaseOperator
-implements InputOperator, ActivationListener<OperatorContext>
-{
- public final transient DefaultOutputPort<byte[]> outPort = new DefaultOutputPort<byte[]>();
- transient ArrayBlockingQueue<byte[]> holdingBuffer;
- int testNum;
-
- @Override
- public void setup(OperatorContext context)
- {
- holdingBuffer = new ArrayBlockingQueue<byte[]>(1024 * 1024);
- }
-
- public void emitTuple(byte[] message)
- {
- outPort.emit(message);
- }
-
- @Override
- public void emitTuples()
- {
- for (int i = holdingBuffer.size(); i-- > 0;) {
- emitTuple(holdingBuffer.poll());
- }
- }
-
- @Override
- public void activate(OperatorContext ctx)
- {
- for (int i = 0; i < testNum; i++) {
- HashMap<String, Integer> dataMapa = new HashMap<String, Integer>();
- dataMapa.put("a", 2);
- holdingBuffer.add(dataMapa.toString().getBytes());
-
- HashMap<String, Integer> dataMapb = new HashMap<String, Integer>();
- dataMapb.put("b", 20);
- holdingBuffer.add(dataMapb.toString().getBytes());
-
- HashMap<String, Integer> dataMapc = new HashMap<String, Integer>();
- dataMapc.put("c", 1000);
- holdingBuffer.add(dataMapc.toString().getBytes());
- }
- }
-
- public void setTestNum(int testNum)
- {
- this.testNum = testNum;
- }
-
- @Override
- public void deactivate()
- {
- }
-
- public void replayTuples(long windowId)
- {
- }
-}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorBenchmark.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorBenchmark.java
index 011c9da..7ea1179 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorBenchmark.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorBenchmark.java
@@ -17,16 +17,13 @@
import org.junit.Test;
-
-
/**
*
*/
public class ZeroMQInputOperatorBenchmark extends ZeroMQInputOperatorTest
{
@Test
- public void testDag() throws InterruptedException, Exception {
- final int testNum = 2000000;
- testHelper(testNum);
+ public void testDag() throws Exception {
+ testHelper(2000000);
}
}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java
index 915c2e9..28df249 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java
@@ -21,11 +21,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.contrib.helper.CollectorModule;
+import com.datatorrent.contrib.helper.MessageQueueTestHelper;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.testhelper.CollectorModule;
-import com.datatorrent.contrib.testhelper.MessageQueueTestHelper;
+
+import com.datatorrent.common.util.DTThrowable;
/**
*
@@ -45,7 +48,7 @@
LocalMode lma = LocalMode.newInstance();
DAG dag = lma.getDAG();
- final ZeroMQMessageGenerator publisher = new ZeroMQMessageGenerator(logger);
+ final ZeroMQMessageGenerator publisher = new ZeroMQMessageGenerator();
publisher.setup();
ZeroMQInputOperator generator = dag.addOperator("Generator", ZeroMQInputOperator.class);
@@ -83,33 +86,29 @@
Thread.sleep(500);
}
Thread.sleep(1000);
- while (true) {
+ startTms = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTms < timeout) {
List<?> list = collector.inputPort.collections.get("collector");
-
if (list.size() < testNum * 3) {
Thread.sleep(10);
- }
-
- else {
+ } else {
break;
}
}
}
catch (InterruptedException ex) {
+ DTThrowable.rethrow(ex);
+ } finally {
+ logger.debug("Shutting down..");
+ lc.shutdown();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ DTThrowable.rethrow(e);
+ } finally {
+ publisher.teardown();
+ }
}
-
- logger.debug("Shutting down..");
- lc.shutdown();
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- publisher.teardown();
-
}
}.start();
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java
index 81a0e75..dc0591d 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java
@@ -1,14 +1,33 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.datatorrent.contrib.zmq;
import java.util.ArrayList;
import java.util.HashMap;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
-import com.datatorrent.contrib.testhelper.MessageQueueTestHelper;
+import com.datatorrent.contrib.helper.MessageQueueTestHelper;
class ZeroMQMessageGenerator {
+
+ private static final Logger logger = LoggerFactory.getLogger(ZeroMQMessageGenerator.class);
+
private ZMQ.Context context;
private ZMQ.Socket publisher;
private ZMQ.Socket syncservice;
@@ -17,12 +36,6 @@
String pubAddr = "tcp://*:5556";
String syncAddr = "tcp://*:5557";
- private static Logger logger;
- public ZeroMQMessageGenerator(Logger loggerInstance)
- {
- logger = loggerInstance;
- }
-
public void setup()
{
context = ZMQ.context(1);
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java
index 06662c9..c38fe00 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java
@@ -1,26 +1,38 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.datatorrent.contrib.zmq;
import java.util.HashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
final class ZeroMQMessageReceiver implements Runnable
{
+ private static final Logger logger = LoggerFactory.getLogger(ZeroMQMessageReceiver.class);
+
public HashMap<String, Integer> dataMap = new HashMap<String, Integer>();
public int count = 0;
protected ZMQ.Context context;
protected ZMQ.Socket subscriber;
protected ZMQ.Socket syncclient;
volatile boolean shutDown = false;
-
- private static org.slf4j.Logger logger;
-
- public ZeroMQMessageReceiver(org.slf4j.Logger loggerInstance)
- {
- logger = loggerInstance;
- }
-
+
public void setup()
{
context = ZMQ.context(1);
@@ -60,7 +72,7 @@
int eq = str.indexOf('=');
String key = str.substring(1, eq);
int value = Integer.parseInt(str.substring(eq + 1, str.length() - 1));
- logger.debug("\nsubscriber recv:" + str);
+ logger.debug("\nsubscriber recv: {}", str);
dataMap.put(key, value);
count++;
logger.debug("out of loop.. ");
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorBenchmark.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorBenchmark.java
index 15bc2ca..1c66f11 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorBenchmark.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorBenchmark.java
@@ -16,6 +16,7 @@
package com.datatorrent.contrib.zmq;
import org.junit.Test;
+import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
@@ -23,15 +24,13 @@
*/
public class ZeroMQOutputOperatorBenchmark extends ZeroMQOutputOperatorTest
{
+ private static final Logger logger = LoggerFactory.getLogger(ZeroMQOutputOperatorBenchmark.class);
+
@Test
public void testDag() throws Exception
{
- final int testNum = 2000000;
-
- logger = LoggerFactory.getLogger(ZeroMQOutputOperatorTest.class);
-
+ int testNum = 2000000;
runTest(testNum);
-
logger.debug(String.format("\nBenchmarked %d tuples", testNum * 3));
logger.debug("end of test");
}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java
index bdd7d85..a7bc6e0 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java
@@ -22,10 +22,13 @@
import org.junit.Test;
import org.slf4j.LoggerFactory;
+import com.datatorrent.contrib.helper.SourceModule;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.testhelper.SourceModule;
+
+import com.datatorrent.common.util.DTThrowable;
/**
@@ -60,7 +63,7 @@
final LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);
- final ZeroMQMessageReceiver receiver = new ZeroMQMessageReceiver(logger);
+ final ZeroMQMessageReceiver receiver = new ZeroMQMessageReceiver();
receiver.setup();
final Thread t = new Thread(receiver);
t.start();
@@ -71,29 +74,30 @@
{
try {
Thread.sleep(1000);
- while (true) {
+ long timeout = 10000L;
+ long startTms = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTms < timeout) {
if (receiver.count < testNum * 3) {
Thread.sleep(10);
- }
- else {
+ } else {
break;
}
}
}
catch (InterruptedException ex) {
+ DTThrowable.rethrow(ex);
+ } finally {
+ logger.debug("done...");
+ lc.shutdown();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ DTThrowable.rethrow(e);
+ } finally {
+ t.interrupt();
+ receiver.teardown();
+ }
}
- logger.debug("done...");
- lc.shutdown();
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- t.interrupt();
- receiver.teardown();
-
-
}
}.start();