APEXMALHAR-2431 Create Kinesis Input operator which emits byte array as a tuple
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/KinesisByteArrayInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/KinesisByteArrayInputOperator.java
new file mode 100644
index 0000000..04e80b6
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/KinesisByteArrayInputOperator.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.kinesis;
+
+import java.nio.ByteBuffer;
+
+import com.amazonaws.services.kinesis.model.Record;
+
+/**
+ * Kinesis input adapter which consumes records from kinesis streams and emits in ByteArray form.
+ *
+ * @category Input
+ * @tags Kinesis, input, ByteArray
+ */
+
+public class KinesisByteArrayInputOperator extends AbstractKinesisInputOperator<byte[]>
+{
+  /**
+   * Implement abstract method of AbstractKinesisInputOperator
+   */
+  @Override
+  public byte[] getTuple(Record record)
+  {
+    try {
+      ByteBuffer bb = record.getData();
+      byte[] bytes = new byte[bb.remaining()];
+      bb.get(bytes);
+      return bytes;
+    }
+    catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
index faffbda..a0eb042 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
@@ -182,6 +182,55 @@
     lc.shutdown();
   }
 
+  @Test
+  public void testKinesisByteArrayInputOperator() throws Exception
+  {
+    int totalCount = 10;
+    // initial the latch for this test
+    latch = new CountDownLatch(1);
+
+    // Start producer
+    KinesisTestProducer p = new KinesisTestProducer(streamName);
+    p.setSendCount(totalCount);
+    p.setBatchSize(9);
+    new Thread(p).start();
+
+    // Create DAG for testing.
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    // Create KinesisByteArrayInputOperator and set some properties with respect to consumer.
+    KinesisByteArrayInputOperator node = dag.addOperator("Kinesis message consumer", KinesisByteArrayInputOperator.class);
+    node.setAccessKey(credentials.getCredentials().getAWSSecretKey());
+    node.setSecretKey(credentials.getCredentials().getAWSAccessKeyId());
+    KinesisConsumer consumer = new KinesisConsumer();
+    consumer.setStreamName(streamName);
+    consumer.setRecordsLimit(totalCount);
+    node.setConsumer(consumer);
+
+    // Create Test tuple collector
+    CollectorModule<byte[]> collector = dag.addOperator("TestMessageCollector", new CollectorModule<byte[]>());
+
+    // Connect ports
+    dag.addStream("Kinesis message", node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
+
+    // Create local cluster
+    final LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(false);
+
+    lc.runAsync();
+
+    // Wait 45s for consumer finish consuming all the messages
+    latch.await(45000, TimeUnit.MILLISECONDS);
+
+    // Check results
+    Assert.assertEquals("Collections size", 1, collections.size());
+    Assert.assertEquals("Tuple count", totalCount, collections.get(collector.inputPort.id).size());
+    logger.debug(String.format("Number of emitted tuples: %d", collections.get(collector.inputPort.id).size()));
+
+    lc.shutdown();
+  }
+
   @Override
   @After
   public void afterTest()