Adding hello-samza example for kinesis

Author: Aditya Toomula <atoomula@atoomula-ld1.linkedin.biz>

Reviewers: Jagadish<jagadish@apache.org>

Closes #30 from atoomula/kinesis
diff --git a/build.gradle b/build.gradle
index 9d1f543..21793c4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -48,6 +48,7 @@
     compile(group: 'org.schwering', name: 'irclib', version: '1.10')
     compile(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION")
+    compile(group: 'org.apache.samza', name: 'samza-aws', version: "$SAMZA_VERSION")
 
     explode (group: 'org.apache.samza', name: 'samza-shell',  ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION")
 
diff --git a/pom.xml b/pom.xml
index da7ec90..a933038 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,6 +79,11 @@
       <version>${samza.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-aws</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.11</artifactId>
       <version>0.10.1.1</version>
diff --git a/src/main/config/kinesis-hello-samza.properties b/src/main/config/kinesis-hello-samza.properties
new file mode 100644
index 0000000..17203df
--- /dev/null
+++ b/src/main/config/kinesis-hello-samza.properties
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=kinesis-hello-samza
+
+job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+yarn.container.count=2
+
+# Task
+task.class=samza.examples.kinesis.KinesisHelloSamza
+# Please replace the below input stream with the stream you plan to consume from.
+task.inputs=kinesis.kinesis-samza-sample-stream
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+# Kinesis System
+systems.kinesis.samza.factory=org.apache.samza.system.kinesis.KinesisSystemFactory
+# Please replace the below with the region of your Kinesis data stream.
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.region=us-west-1
+# Access key below is a dummy key for instructional purposes. Please replace with your own key.
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.accessKey=AKIAIHSMRK3Q72O8TEXQ
+# Secret key below is a dummy key for instructional purposes. Please replace with your own key.
+sensitive.systems.kinesis.streams.kinesis-samza-sample-stream.aws.secretKey=9GuEqdY+gNXXGrOQyev8XKziY+sRB1ht91jloEyP
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.kcl.TableName=kinesis-hello-samza
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=json
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
diff --git a/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java
new file mode 100644
index 0000000..4fadf78
--- /dev/null
+++ b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.kinesis;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.kinesis.consumer.KinesisIncomingMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A sample task which consumes messages from kinesis stream and logs the message content.
+ */
+public class KinesisHelloSamza implements StreamTask {
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisHelloSamza.class);
+
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+    KinesisIncomingMessageEnvelope kEnvelope = (KinesisIncomingMessageEnvelope) envelope;
+    long lagMs = System.currentTimeMillis() - kEnvelope.getApproximateArrivalTimestamp().getTime();
+    LOG.info(String.format("Kinesis message key: %s Lag: %d ms", envelope.getKey(), lagMs));
+  }
+}
\ No newline at end of file