Merge latest with master
diff --git a/.reviewboardrc b/.reviewboardrc
index a989cb2..dc119ba 100644
--- a/.reviewboardrc
+++ b/.reviewboardrc
@@ -19,4 +19,4 @@
 REPOSITORY = 'samza-hello-samza'
 GUESS_DESCRIPTION = True
 TARGET_GROUPS = 'samza'
-TRACKING_BRANCH = 'origin/latest'
+TRACKING_BRANCH = 'origin/master'
diff --git a/README.md b/README.md
index 81c7624..975a2c6 100644
--- a/README.md
+++ b/README.md
@@ -98,6 +98,83 @@
 mvn test -Dtest=<ClassName>
 ```
 
+### Instructions
+
+The **Hello Samza** project contains example Samza applications of high-level API as well as low-level API. The following are the instructions to install the binaries and run the applications in a local Yarn cluster. See also [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) and [Hello Samza High Level API](http://samza.apache.org/learn/tutorials/latest/hello-samza-high-level-yarn.html) for more information.
+
+#### 1. Get the Code
+
+Check out the hello-samza project:
+
+```
+git clone https://git.apache.org/samza-hello-samza.git hello-samza
+cd hello-samza
+```
+
+To build hello-samza with the latest Samza master, you can switch to the _latest_ branch:
+
+```
+git checkout latest
+```
+
+This project contains everything you'll need to run your first Samza application.
+
+#### 2. Start a Grid
+
+A Samza grid usually comprises three different systems: [YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html), [Kafka](http://kafka.apache.org/), and [ZooKeeper](http://zookeeper.apache.org/). The hello-samza project comes with a script called "grid" to help you setup these systems. Start by running:
+
+```
+./bin/grid bootstrap
+```
+
+This command will download, install, and start ZooKeeper, Kafka, and YARN. It will also check out the latest version of Samza and build it. All package files will be put in a sub-directory called "deploy" inside hello-samza's root folder.
+
+If you get a complaint that _JAVA_HOME_ is not set, then you'll need to set it to the path where Java is installed on your system.
+
+Once the grid command completes, you can verify that YARN is up and running by going to [http://localhost:8088](http://localhost:8088). This is the YARN UI.
+
+#### 3. Build a Samza Application Package
+
+Before you can run a Samza application, you need to build a package for it. This package is what YARN uses to deploy your apps on the grid. Use the following command in hello-samza project to build and deploy the example applications:
+
+```
+./bin/deploy.sh
+```
+
+#### 4. Run a Samza Application
+
+After you've built your Samza package, you can start the example applications on the grid.
+
+##### - High-level API Examples
+
+Package [samza.examples.cookbook](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/cookbook) contains various examples of high-level API operator usage, such as map, partitionBy, window and join. Each example is a runnable Samza application with the steps in the class javadocs, e.g [PageViewAdClickJoiner](https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java).
+
+Package [samza.examples.wikipedia.application](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/wikipedia/application) contains a small Samza application which consumes the real-time feeds from Wikipedia, extracts the metadata of the events, and calculates statistics of all edits in a 10-second window. You can start the app on the grid using the run-app.sh script:
+
+```
+./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-application.properties
+```
+
+Once the job is started, we can tail the kafka topic by:
+
+```
+./deploy/kafka/bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic wikipedia-stats
+```
+
+A code walkthrough of this application can be found [here](http://samza.apache.org/learn/tutorials/latest/hello-samza-high-level-code.html).
+
+##### - Low-level API Examples
+
+Package [samza.examples.wikipedia.task](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/wikipedia/task) contains the low-level API Samza code for the Wikipedia example. To run it, use the following scripts:
+
+```
+deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-feed.properties
+deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-parser.properties
+deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-stats.properties
+```
+
+Once the jobs are started, you can use the same _kafka-console-consumer.sh_ command as in the high-level API Wikipedia example to check out the output of the statistics.
+
 ### Contribution
 
 To start contributing on [Hello Samza](http://samza.apache.org/startup/hello-samza/latest/) first read [Rules](http://samza.apache.org/contribute/rules.html) and [Contributor Corner](https://cwiki.apache.org/confluence/display/SAMZA/Contributor%27s+Corner). Notice that [Hello Samza](http://samza.apache.org/startup/hello-samza/latest/) git repository does not support git pull request.
diff --git a/bin/deploy.sh b/bin/deploy.sh
index 3c3ada2..d6ce59c 100755
--- a/bin/deploy.sh
+++ b/bin/deploy.sh
@@ -23,4 +23,4 @@
 
 mvn clean package
 mkdir -p $base_dir/deploy/samza
-tar -xvf $base_dir/target/hello-samza-1.0.0-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza
+tar -xvf $base_dir/target/hello-samza-1.0.0-dist.tar.gz -C $base_dir/deploy/samza
diff --git a/bin/grid b/bin/grid
index 8c5b7dd..f31824b 100755
--- a/bin/grid
+++ b/bin/grid
@@ -35,7 +35,7 @@
 COMMAND=$1
 SYSTEM=$2
 
-DOWNLOAD_KAFKA=https://archive.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
+DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz
 DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
 DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
 
@@ -79,7 +79,6 @@
 }
 
 install_samza() {
-  echo "Building samza from master..."
   mkdir -p "$DEPLOY_ROOT_DIR"
   if [ -d "$DOWNLOAD_CACHE_DIR/samza/.git" ]; then
     pushd "$DOWNLOAD_CACHE_DIR/samza"
@@ -113,7 +112,7 @@
 
 install_kafka() {
   mkdir -p "$DEPLOY_ROOT_DIR"
-  install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.1.1
+  install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.2.1
   # have to use SIGTERM since nohup on appears to ignore SIGINT
   # and Kafka switched to SIGINT in KAFKA-1031.
   sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh
diff --git a/build.gradle b/build.gradle
index 80dafea..143995d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -56,6 +56,7 @@
     compile(group: 'org.schwering', name: 'irclib', version: '1.10')
     compile(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION")
+
     compile(group: 'org.apache.samza', name: 'samza-test_2.11', version: "$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-kafka_2.11', version: "$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: "$SAMZA_VERSION")
@@ -64,6 +65,7 @@
     explode (group: 'org.apache.samza', name: 'samza-shell',  ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION")
     runtime(group: 'org.apache.samza', name: 'samza-core_2.11', version: "$SAMZA_VERSION")
     runtime(group: 'org.apache.samza', name: 'samza-log4j', version: "$SAMZA_VERSION")
+
     runtime(group: 'org.apache.samza', name: 'samza-shell', version: "$SAMZA_VERSION")
     runtime(group: 'org.apache.samza', name: 'samza-yarn_2.11', version: "$SAMZA_VERSION")
     runtime(group: 'org.apache.kafka', name: 'kafka_2.11', version: "$KAFKA_VERSION")
diff --git a/conf/yarn-site.xml b/conf/yarn-site.xml
index 9028590..6c48260 100644
--- a/conf/yarn-site.xml
+++ b/conf/yarn-site.xml
@@ -30,4 +30,8 @@
     <name>yarn.resourcemanager.hostname</name>
     <value>127.0.0.1</value>
   </property>
+  <property>
+    <name>yarn.nodemanager.delete.debug-delay.sec</name>
+    <value>86400</value>
+  </property>
 </configuration>
diff --git a/pom.xml b/pom.xml
index 41ff462..230fb50 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
 
   <groupId>org.apache.samza</groupId>
   <artifactId>hello-samza</artifactId>
-  <version>1.0.0-SNAPSHOT</version>
+  <version>1.0.0</version>
   <packaging>jar</packaging>
   <name>Samza Example</name>
   <description>
@@ -132,6 +132,11 @@
       <version>${samza.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-aws</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.11</artifactId>
       <version>0.11.0.2</version>
diff --git a/src/main/config/kinesis-hello-samza.properties b/src/main/config/kinesis-hello-samza.properties
new file mode 100644
index 0000000..17203df
--- /dev/null
+++ b/src/main/config/kinesis-hello-samza.properties
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=kinesis-hello-samza
+
+job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+yarn.container.count=2
+
+# Task
+task.class=samza.examples.kinesis.KinesisHelloSamza
+# Please replace the below input stream with the stream you plan to consume from.
+task.inputs=kinesis.kinesis-samza-sample-stream
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+# Kinesis System
+systems.kinesis.samza.factory=org.apache.samza.system.kinesis.KinesisSystemFactory
+# Please replace the below with the region of your Kinesis data stream.
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.region=us-west-1
+# Access key below is a dummy key for instructional purposes. Please replace with your own key.
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.accessKey=AKIAIHSMRK3Q72O8TEXQ
+# Secret key below is a dummy key for instructional purposes. Please replace with your own key.
+sensitive.systems.kinesis.streams.kinesis-samza-sample-stream.aws.secretKey=9GuEqdY+gNXXGrOQyev8XKziY+sRB1ht91jloEyP
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.kcl.TableName=kinesis-hello-samza
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=json
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
diff --git a/src/main/config/pageview-filter-sql.properties b/src/main/config/pageview-filter-sql.properties
new file mode 100644
index 0000000..49a4271
--- /dev/null
+++ b/src/main/config/pageview-filter-sql.properties
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-filter
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewFilterSqlApp
+app.runner.class=org.apache.samza.sql.runner.SamzaSqlApplicationRunner
+
+# Avro schema files used in the sql command.
+schema.files=file://${basedir}/src/main/schemas/OutputTopic.avsc,file://${basedir}/src/main/schemas/PageViewStream.avsc
+
+# Samza sql configs
+samza.sql.stmt=insert into kafka.ouputTopic select id, Name from PageViewStream
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=avro
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=1
diff --git a/src/main/java/samza/examples/avro/AvroSerDeFactory.java b/src/main/java/samza/examples/avro/AvroSerDeFactory.java
new file mode 100644
index 0000000..c96eedd
--- /dev/null
+++ b/src/main/java/samza/examples/avro/AvroSerDeFactory.java
@@ -0,0 +1,73 @@
+package samza.examples.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+
+
+public class AvroSerDeFactory implements SerdeFactory {
+
+  public static String CFG_AVRO_SCHEMA = "serializers.avro.schema";
+
+  @Override
+  public Serde getSerde(String name, Config config) {
+    return new AvroSerDe(config);
+  }
+
+  private class AvroSerDe implements Serde {
+    private final Schema schema;
+
+    public AvroSerDe(Config config) {
+      schema = Schema.parse(config.get(CFG_AVRO_SCHEMA));
+    }
+
+    @Override
+    public Object fromBytes(byte[] bytes) {
+      GenericRecord record;
+      try {
+        record = genericRecordFromBytes(bytes, schema);
+      } catch (IOException e) {
+        throw new SamzaException("Unable to deserialize the record", e);
+      }
+      return record;
+    }
+
+    @Override
+    public byte[] toBytes(Object o) {
+      GenericRecord record = (GenericRecord) o;
+      try {
+        return encodeAvroGenericRecord(schema, record);
+      } catch (IOException e) {
+        throw new SamzaException("Unable to serialize the record", e);
+      }
+    }
+  }
+
+  public byte[] encodeAvroGenericRecord(Schema schema, GenericRecord record) throws IOException {
+    DatumWriter<IndexedRecord> msgDatumWriter = new GenericDatumWriter<>(schema);
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    Encoder encoder = EncoderFactory.get().binaryEncoder(os, null);
+    msgDatumWriter.write(record, encoder);
+    encoder.flush();
+    return os.toByteArray();
+  }
+
+  private static <T> T genericRecordFromBytes(byte[] bytes, Schema schema) throws IOException {
+    BinaryDecoder binDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null);
+    GenericDatumReader<T> reader = new GenericDatumReader<>(schema);
+    return reader.read(null, binDecoder);
+  }
+}
diff --git a/src/main/java/samza/examples/cookbook/AdClick.java b/src/main/java/samza/examples/cookbook/AdClick.java
new file mode 100644
index 0000000..2d15cec
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/AdClick.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.cookbook;
+
+/**
+ * Represents an ad click event.
+ */
+public class AdClick {
+  /*
+   * An unique identifier for the ad
+   */
+  private final String adId;
+  /**
+   * The user that clicked the ad
+   */
+  private final String userId;
+  /**
+   * The id of the page that the ad was served from
+   */
+  private final String pageId;
+
+  public AdClick(String message) {
+    String[] adClickFields = message.split(",");
+    this.adId = adClickFields[0];
+    this.userId = adClickFields[1];
+    this.pageId = adClickFields[2];
+  }
+
+  public String getAdId() {
+    return adId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getPageId() {
+    return pageId;
+  }
+
+}
\ No newline at end of file
diff --git a/src/main/java/samza/examples/cookbook/PageView.java b/src/main/java/samza/examples/cookbook/PageView.java
new file mode 100644
index 0000000..7803db7
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageView.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+/**
+ * Represents a Page view event
+ */
+class PageView {
+  /**
+   * The user that viewed the page
+   */
+  private final String userId;
+  /**
+   * The region that the page was viewed from
+   */
+  private final String country;
+  /**
+   * A trackingId for the page
+   */
+  private final String pageId;
+
+  /**
+   * Constructs a {@link PageView} from the provided string.
+   *
+   * @param message in the following CSV format - userId,country,url
+   */
+  PageView(String message) {
+    String[] pageViewFields = message.split(",");
+    userId = pageViewFields[0];
+    country = pageViewFields[1];
+    pageId = pageViewFields[2];
+  }
+
+  String getUserId() {
+    return userId;
+  }
+
+  String getCountry() {
+    return country;
+  }
+
+  String getPageId() {
+    return pageId;
+  }
+}
diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java
new file mode 100644
index 0000000..de01969
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java
@@ -0,0 +1,72 @@
+package samza.examples.cookbook;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import org.apache.avro.Schema;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.sql.runner.SamzaSqlApplication;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+
+
+/**
+ * In this example, we demonstrate how to use SQL to create a samza job.
+ *
+ * <p>Concepts covered: Using sql to perform Stream processing.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ *   <li>
+ *     Ensure that the topic "PageViewStream" is created  <br/>
+ *     ./kafka-topics.sh  --zookeeper localhost:2181 --create --topic PageViewStream --partitions 1 --replication-factor 1
+ *   </li>
+ *   <li>
+ *     Run the application using the ./bin/run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ *     --config-path=file://$PWD/deploy/samza/config/pageview-filter-sql.properties)
+ *   </li>
+ *   <li>
+ *     Produce some messages to the "PageViewStream" topic <br/>
+ *     Please follow instructions at https://github.com/srinipunuru/samzasqltools on how to produce events into PageViewStream<br/>
+ *   </li>
+ *   <li>
+ *     Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh)
+ *     ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic outputTopic <br/>
+ *     --property print.key=true    </li>
+ * </ol>
+ *
+ */
+
+public class PageViewFilterSqlApp extends SamzaSqlApplication {
+
+  public static final String CFG_SCHEMA_FILES = "schema.files";
+  private static final String CFG_SCHEMA_VALUE_FMT = "";
+
+  @Override
+  public void init(StreamGraph streamGraph, Config config) {
+    String sqlStmt = "insert into kafka.NewLinkedInEmployees select id, Name from ProfileChangeStream";
+    String schemaFiles = config.get(CFG_SCHEMA_FILES);
+    HashMap<String, String> newConfig = new HashMap<>();
+    newConfig.putAll(config);
+    populateSchemaConfigs(schemaFiles, newConfig);
+    newConfig.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sqlStmt);
+    super.init(streamGraph, new MapConfig(newConfig));
+  }
+
+  private void populateSchemaConfigs(String schemaFilesValue, HashMap<String, String> config) {
+    String[] schemaFiles = schemaFilesValue.split(",");
+    for (String schemaFileValue : schemaFiles) {
+      try {
+        File schemaFile = new File(schemaFileValue);
+        String schemaValue = Schema.parse(schemaFile).toString();
+        config.put(String.format(CFG_SCHEMA_VALUE_FMT, schemaFile.getName()), schemaValue);
+      } catch (IOException e) {
+        throw new SamzaException("Unable to parse the schemaFile " + schemaFileValue, e);
+      }
+    }
+  }
+}
diff --git a/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java
new file mode 100644
index 0000000..4fadf78
--- /dev/null
+++ b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.kinesis;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.kinesis.consumer.KinesisIncomingMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A sample task which consumes messages from kinesis stream and logs the message content.
+ */
+public class KinesisHelloSamza implements StreamTask {
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisHelloSamza.class);
+
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+    KinesisIncomingMessageEnvelope kEnvelope = (KinesisIncomingMessageEnvelope) envelope;
+    long lagMs = System.currentTimeMillis() - kEnvelope.getApproximateArrivalTimestamp().getTime();
+    LOG.info(String.format("Kinesis message key: %s Lag: %d ms", envelope.getKey(), lagMs));
+  }
+}
\ No newline at end of file
diff --git a/src/main/schemas/OutputTopic.avsc b/src/main/schemas/OutputTopic.avsc
new file mode 100644
index 0000000..7670b1b
--- /dev/null
+++ b/src/main/schemas/OutputTopic.avsc
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+  "name": "SimpleRecord",
+  "version" : 1,
+  "namespace": "org.apache.samza.sql.system.avro",
+  "type": "record",
+  "fields": [
+    {
+      "name": "id",
+      "doc": "Record id.",
+      "type": ["null", "int"],
+      "default":null
+    },
+    {
+      "name": "Name",
+      "doc" : "Some name.",
+      "type": ["null", "string"],
+      "default":null
+    }
+  ]
+}
diff --git a/src/main/schemas/PageViewStream.avsc b/src/main/schemas/PageViewStream.avsc
new file mode 100644
index 0000000..54936f7
--- /dev/null
+++ b/src/main/schemas/PageViewStream.avsc
@@ -0,0 +1,32 @@
+{
+  "name": "PageViewEvent",
+  "version" : 1,
+  "namespace": "com.linkedin.samza.tools.avro",
+  "type": "record",
+  "fields": [
+    {
+      "name": "id",
+      "doc": "Record id.",
+      "type": ["null", "int"],
+      "default":null
+    },
+    {
+      "name": "Name",
+      "doc": "Name of the profile.",
+      "type": ["null", "string"],
+      "default":null
+    },
+    {
+      "name": "ViewerName",
+      "doc": "Name of the person who viewed the profile.",
+      "type": ["null", "string"],
+      "default":null
+    },
+    {
+      "name": "ProfileViewTimestamp",
+      "doc": "Time at which the profile was viewed.",
+      "type": ["null", "long"],
+      "default":null
+    }
+  ]
+}