APEXMALHAR-2431 Create Kinesis Input operator which emits byte array as a tuple
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/KinesisByteArrayInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/KinesisByteArrayInputOperator.java
new file mode 100644
index 0000000..04e80b6
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/KinesisByteArrayInputOperator.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.kinesis;
+
+import java.nio.ByteBuffer;
+
+import com.amazonaws.services.kinesis.model.Record;
+
+/**
+ * Kinesis input adapter which consumes records from kinesis streams and emits in ByteArray form.
+ *
+ * @category Input
+ * @tags Kinesis, input, ByteArray
+ */
+
+public class KinesisByteArrayInputOperator extends AbstractKinesisInputOperator<byte[]>
+{
+ /**
+ * Implement abstract method of AbstractKinesisInputOperator
+ */
+ @Override
+ public byte[] getTuple(Record record)
+ {
+ try {
+ ByteBuffer bb = record.getData();
+ byte[] bytes = new byte[bb.remaining()];
+ bb.get(bytes);
+ return bytes;
+ }
+ catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
index faffbda..a0eb042 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
@@ -182,6 +182,55 @@
lc.shutdown();
}
+ @Test
+ public void testKinesisByteArrayInputOperator() throws Exception
+ {
+ int totalCount = 10;
+ // initial the latch for this test
+ latch = new CountDownLatch(1);
+
+ // Start producer
+ KinesisTestProducer p = new KinesisTestProducer(streamName);
+ p.setSendCount(totalCount);
+ p.setBatchSize(9);
+ new Thread(p).start();
+
+ // Create DAG for testing.
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ // Create KinesisByteArrayInputOperator and set some properties with respect to consumer.
+ KinesisByteArrayInputOperator node = dag.addOperator("Kinesis message consumer", KinesisByteArrayInputOperator.class);
+ node.setAccessKey(credentials.getCredentials().getAWSSecretKey());
+ node.setSecretKey(credentials.getCredentials().getAWSAccessKeyId());
+ KinesisConsumer consumer = new KinesisConsumer();
+ consumer.setStreamName(streamName);
+ consumer.setRecordsLimit(totalCount);
+ node.setConsumer(consumer);
+
+ // Create Test tuple collector
+ CollectorModule<byte[]> collector = dag.addOperator("TestMessageCollector", new CollectorModule<byte[]>());
+
+ // Connect ports
+ dag.addStream("Kinesis message", node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
+
+ // Create local cluster
+ final LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(false);
+
+ lc.runAsync();
+
+ // Wait 45s for consumer finish consuming all the messages
+ latch.await(45000, TimeUnit.MILLISECONDS);
+
+ // Check results
+ Assert.assertEquals("Collections size", 1, collections.size());
+ Assert.assertEquals("Tuple count", totalCount, collections.get(collector.inputPort.id).size());
+ logger.debug(String.format("Number of emitted tuples: %d", collections.get(collector.inputPort.id).size()));
+
+ lc.shutdown();
+ }
+
@Override
@After
public void afterTest()