Merge branch 'latest'

Conflicts:
	README.md
	bin/deploy.sh
	bin/grid
	build.gradle
	gradle.properties
	pom.xml
	src/main/assembly/src.xml
	src/main/config/pageview-adclick-joiner.properties
	src/main/config/pageview-filter.properties
	src/main/config/pageview-sessionizer.properties
	src/main/config/tumbling-pageview-counter.properties
	src/main/config/wikipedia-application-local-runner.properties
	src/main/config/wikipedia-application.properties
	src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java
	src/main/java/samza/examples/cookbook/PageViewFilterApp.java
	src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java
	src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java
	src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
diff --git a/.reviewboardrc b/.reviewboardrc
index a989cb2..dc119ba 100644
--- a/.reviewboardrc
+++ b/.reviewboardrc
@@ -19,4 +19,4 @@
 REPOSITORY = 'samza-hello-samza'
 GUESS_DESCRIPTION = True
 TARGET_GROUPS = 'samza'
-TRACKING_BRANCH = 'origin/latest'
+TRACKING_BRANCH = 'origin/master'
diff --git a/README.md b/README.md
index 0f80e9e..dd676dc 100644
--- a/README.md
+++ b/README.md
@@ -1,14 +1,89 @@
 hello-samza
 ===========
 
-Hello Samza is a starter project for [Apache Samza](http://samza.apache.org/) jobs.
+**Hello Samza** is a starter project for [Apache Samza](http://samza.apache.org/) jobs.
 
-Please see [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) to get started.
-
-### Pull requests and questions
+### About
 
 [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) is developed as part of the [Apache Samza](http://samza.apache.org) project. Please direct questions, improvements and bug fixes there. Questions about [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) are welcome on the [dev list](http://samza.apache.org/community/mailing-lists.html) and the [Samza JIRA](https://issues.apache.org/jira/browse/SAMZA) has a hello-samza component for filing tickets.
 
+### Instructions
+
+The **Hello Samza** project contains example Samza applications of high-level API as well as low-level API. The following are the instructions to install the binaries and run the applications in a local Yarn cluster. See also [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) and [Hello Samza High Level API](http://samza.apache.org/learn/tutorials/latest/hello-samza-high-level-yarn.html) for more information.
+
+#### 1. Get the Code
+
+Check out the hello-samza project:
+
+```
+git clone https://git.apache.org/samza-hello-samza.git hello-samza
+cd hello-samza
+```
+
+To build hello-samza with the latest Samza master, you can switch to the _latest_ branch:
+
+```
+git checkout latest
+```
+
+This project contains everything you'll need to run your first Samza application.
+
+#### 2. Start a Grid
+
+A Samza grid usually comprises three different systems: [YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html), [Kafka](http://kafka.apache.org/), and [ZooKeeper](http://zookeeper.apache.org/). The hello-samza project comes with a script called "grid" to help you setup these systems. Start by running:
+
+```
+./bin/grid bootstrap
+```
+
+This command will download, install, and start ZooKeeper, Kafka, and YARN. It will also check out the latest version of Samza and build it. All package files will be put in a sub-directory called "deploy" inside hello-samza's root folder.
+
+If you get a complaint that _JAVA_HOME_ is not set, then you'll need to set it to the path where Java is installed on your system.
+
+Once the grid command completes, you can verify that YARN is up and running by going to [http://localhost:8088](http://localhost:8088). This is the YARN UI.
+
+#### 3. Build a Samza Application Package
+
+Before you can run a Samza application, you need to build a package for it. This package is what YARN uses to deploy your apps on the grid. Use the following command in hello-samza project to build and deploy the example applications:
+
+```
+./bin/deploy.sh
+```
+
+#### 4. Run a Samza Application
+
+After you've built your Samza package, you can start the example applications on the grid.
+
+##### - High-level API Examples
+
+Package [samza.examples.cookbook](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/cookbook) contains various examples of high-level API operator usage, such as map, partitionBy, window and join. Each example is a runnable Samza application with the steps in the class javadocs, e.g [PageViewAdClickJoiner](https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java).
+
+Package [samza.examples.wikipedia.application](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/wikipedia/application) contains a small Samza application which consumes the real-time feeds from Wikipedia, extracts the metadata of the events, and calculates statistics of all edits in a 10-second window. You can start the app on the grid using the run-app.sh script:
+
+```
+./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-application.properties
+```
+
+Once the job is started, we can tail the kafka topic by:
+
+```
+./deploy/kafka/bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic wikipedia-stats
+```
+
+A code walkthrough of this application can be found [here](http://samza.apache.org/learn/tutorials/latest/hello-samza-high-level-code.html).
+
+##### - Low-level API Examples
+
+Package [samza.examples.wikipedia.task](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/wikipedia/task) contains the low-level API Samza code for the Wikipedia example. To run it, use the following scripts:
+
+```
+deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-feed.properties
+deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-parser.properties
+deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-stats.properties
+```
+
+Once the jobs are started, you can use the same _kafka-console-consumer.sh_ command as in the high-level API Wikipedia example to check out the output of the statistics.
+
 ### Contribution
 
 To start contributing on [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) first read [Rules](http://samza.apache.org/contribute/rules.html) and [Contributor Corner](https://cwiki.apache.org/confluence/display/SAMZA/Contributor%27s+Corner). Notice that [Hello Samza](http://samza.apache.org/startup/hello-samza/0.13/) git repository does not support git pull request.
diff --git a/bin/deploy.sh b/bin/deploy.sh
index 51faed1..9526067 100755
--- a/bin/deploy.sh
+++ b/bin/deploy.sh
@@ -23,4 +23,4 @@
 
 mvn clean package
 mkdir -p $base_dir/deploy/samza
-tar -xvf $base_dir/target/hello-samza-0.13.1-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza
+tar -xvf $base_dir/target/hello-samza-0.14.0-dist.tar.gz -C $base_dir/deploy/samza
diff --git a/bin/grid b/bin/grid
index 8c5b7dd..f31824b 100755
--- a/bin/grid
+++ b/bin/grid
@@ -35,7 +35,7 @@
 COMMAND=$1
 SYSTEM=$2
 
-DOWNLOAD_KAFKA=https://archive.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
+DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz
 DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
 DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
 
@@ -79,7 +79,6 @@
 }
 
 install_samza() {
-  echo "Building samza from master..."
   mkdir -p "$DEPLOY_ROOT_DIR"
   if [ -d "$DOWNLOAD_CACHE_DIR/samza/.git" ]; then
     pushd "$DOWNLOAD_CACHE_DIR/samza"
@@ -113,7 +112,7 @@
 
 install_kafka() {
   mkdir -p "$DEPLOY_ROOT_DIR"
-  install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.1.1
+  install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.2.1
   # have to use SIGTERM since nohup on appears to ignore SIGINT
   # and Kafka switched to SIGINT in KAFKA-1031.
   sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh
diff --git a/build.gradle b/build.gradle
index 9d1f543..21793c4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -48,6 +48,7 @@
     compile(group: 'org.schwering', name: 'irclib', version: '1.10')
     compile(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION")
+    compile(group: 'org.apache.samza', name: 'samza-aws', version: "$SAMZA_VERSION")
 
     explode (group: 'org.apache.samza', name: 'samza-shell',  ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION")
 
diff --git a/conf/yarn-site.xml b/conf/yarn-site.xml
index 9028590..6c48260 100644
--- a/conf/yarn-site.xml
+++ b/conf/yarn-site.xml
@@ -30,4 +30,8 @@
     <name>yarn.resourcemanager.hostname</name>
     <value>127.0.0.1</value>
   </property>
+  <property>
+    <name>yarn.nodemanager.delete.debug-delay.sec</name>
+    <value>86400</value>
+  </property>
 </configuration>
diff --git a/gradle.properties b/gradle.properties
index f14b8f7..e207baa 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-SAMZA_VERSION=0.14.1-SNAPSHOT
+SAMZA_VERSION=0.14.0
 KAFKA_VERSION=0.10.1.1
 HADOOP_VERSION=2.6.1
 
diff --git a/pom.xml b/pom.xml
index dbfe749..2065d7a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
 
   <groupId>org.apache.samza</groupId>
   <artifactId>hello-samza</artifactId>
-  <version>0.14.1-SNAPSHOT</version>
+  <version>0.14.0</version>
   <packaging>jar</packaging>
   <name>Samza Example</name>
   <description>
@@ -84,6 +84,11 @@
       <version>${samza.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-aws</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.11</artifactId>
       <version>0.10.1.1</version>
@@ -148,7 +153,7 @@
   <properties>
     <!-- maven specific properties -->
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <samza.version>0.14.1-SNAPSHOT</samza.version>
+    <samza.version>0.14.0</samza.version>
     <hadoop.version>2.6.1</hadoop.version>
   </properties>
 
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index 8f3694e..c04ace0 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -48,10 +48,6 @@
       <source>${basedir}/bin/run-wikipedia-zk-application.sh</source>
       <outputDirectory>bin</outputDirectory>
     </file>
-    <file>
-      <source>${basedir}/bin/run-azure-application.sh</source>
-      <outputDirectory>bin</outputDirectory>
-    </file>
   </files>
   <dependencySets>
     <dependencySet>
diff --git a/src/main/config/kinesis-hello-samza.properties b/src/main/config/kinesis-hello-samza.properties
new file mode 100644
index 0000000..17203df
--- /dev/null
+++ b/src/main/config/kinesis-hello-samza.properties
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=kinesis-hello-samza
+
+job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+yarn.container.count=2
+
+# Task
+task.class=samza.examples.kinesis.KinesisHelloSamza
+# Please replace the below input stream with the stream you plan to consume from.
+task.inputs=kinesis.kinesis-samza-sample-stream
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+# Kinesis System
+systems.kinesis.samza.factory=org.apache.samza.system.kinesis.KinesisSystemFactory
+# Please replace the below with the region of your Kinesis data stream.
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.region=us-west-1
+# Access key below is a dummy key for instructional purposes. Please replace with your own key.
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.accessKey=AKIAIHSMRK3Q72O8TEXQ
+# Secret key below is a dummy key for instructional purposes. Please replace with your own key.
+sensitive.systems.kinesis.streams.kinesis-samza-sample-stream.aws.secretKey=9GuEqdY+gNXXGrOQyev8XKziY+sRB1ht91jloEyP
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.kcl.TableName=kinesis-hello-samza
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=json
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
diff --git a/src/main/config/pageview-profile-table-joiner.properties b/src/main/config/pageview-profile-table-joiner.properties
deleted file mode 100644
index 7cec601..0000000
--- a/src/main/config/pageview-profile-table-joiner.properties
+++ /dev/null
@@ -1,35 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Job
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-job.name=pageview-profile-table-joiner
-job.container.count=2
-job.default.system=kafka
-job.coordinator.system=kafka
-
-# YARN
-yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
-
-# Task
-app.class=samza.examples.cookbook.PageViewProfileTableJoiner
-
-# Kafka System
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.consumer.zookeeper.connect=localhost:2181
-systems.kafka.producer.bootstrap.servers=localhost:9092
-systems.kafka.default.stream.replication.factor=1
\ No newline at end of file
diff --git a/src/main/java/samza/examples/cookbook/AdClick.java b/src/main/java/samza/examples/cookbook/AdClick.java
new file mode 100644
index 0000000..2d15cec
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/AdClick.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.cookbook;
+
+/**
+ * Represents an ad click event.
+ */
+public class AdClick {
+  /*
+   * An unique identifier for the ad
+   */
+  private final String adId;
+  /**
+   * The user that clicked the ad
+   */
+  private final String userId;
+  /**
+   * The id of the page that the ad was served from
+   */
+  private final String pageId;
+
+  public AdClick(String message) {
+    String[] adClickFields = message.split(",");
+    this.adId = adClickFields[0];
+    this.userId = adClickFields[1];
+    this.pageId = adClickFields[2];
+  }
+
+  public String getAdId() {
+    return adId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getPageId() {
+    return pageId;
+  }
+
+}
\ No newline at end of file
diff --git a/src/main/java/samza/examples/cookbook/PageView.java b/src/main/java/samza/examples/cookbook/PageView.java
new file mode 100644
index 0000000..7803db7
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageView.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+/**
+ * Represents a Page view event
+ */
+class PageView {
+  /**
+   * The user that viewed the page
+   */
+  private final String userId;
+  /**
+   * The region that the page was viewed from
+   */
+  private final String country;
+  /**
+   * A trackingId for the page
+   */
+  private final String pageId;
+
+  /**
+   * Constructs a {@link PageView} from the provided string.
+   *
+   * @param message in the following CSV format - userId,country,url
+   */
+  PageView(String message) {
+    String[] pageViewFields = message.split(",");
+    userId = pageViewFields[0];
+    country = pageViewFields[1];
+    pageId = pageViewFields[2];
+  }
+
+  String getUserId() {
+    return userId;
+  }
+
+  String getCountry() {
+    return country;
+  }
+
+  String getPageId() {
+    return pageId;
+  }
+}
diff --git a/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java b/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java
deleted file mode 100644
index 86deb61..0000000
--- a/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package samza.examples.cookbook;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.storage.kv.RocksDbTableDescriptor;
-import org.apache.samza.table.Table;
-
-import samza.examples.cookbook.data.PageView;
-import samza.examples.cookbook.data.Profile;
-
-/**
- * In this example, we join a stream of Page views with a table of user profiles, which is populated from an
- * user profile stream. For instance, this is helpful for analysis that required additional information from
- * user's profile.
- *
- * <p> Concepts covered: Performing stream-to-table joins.
- *
- * To run the below example:
- *
- * <ol>
- *   <li>
- *     Ensure that the topics "pageview-join-input", "profile-table-input" are created  <br/>
- *     ./deploy/kafka/bin/kafka-topics.sh  --zookeeper localhost:2181 --create --topic pageview-join-input --partitions 2 --replication-factor 1
- *     ./deploy/kafka/bin/kafka-topics.sh  --zookeeper localhost:2181 --create --topic profile-table-input --partitions 2 --replication-factor 1
- *   </li>
- *   <li>
- *     Run the application using the run-app.sh script <br/>
- *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/pageview-profile-table-joiner.properties
- *   </li>
- *   <li>
- *     Produce some messages to the "profile-table-input" topic with the same userId <br/>
- *     ./deploy/kafka/bin/kafka-console-producer.sh --topic profile-table-input --broker-list localhost:9092 <br/>
- *     {"userId": "user1", "company": "LNKD"} <br/>
- *     {"userId": "user2", "company": "MSFT"}
- *   </li>
- *   <li>
- *     Produce some messages to the "pageview-join-input" topic <br/>
- *     ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-join-input --broker-list localhost:9092 <br/>
- *     {"userId": "user1", "country": "india", "pageId":"google.com"} <br/>
- *     {"userId": "user2", "country": "china", "pageId":"yahoo.com"}
- *   </li>
- *   <li>
- *     Consume messages from the "enriched-pageview-join-output" topic <br/>
- *     ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic enriched-pageview-join-output
- *   </li>
- * </ol>
- *
- */
-public class PageViewProfileTableJoiner implements StreamApplication {
-
-  private static final String PROFILE_TOPIC = "profile-table-input";
-  private static final String PAGEVIEW_TOPIC = "pageview-join-input";
-  private static final String OUTPUT_TOPIC = "enriched-pageview-join-output";
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-
-    Serde<Profile> profileSerde = new JsonSerdeV2<>(Profile.class);
-    Serde<PageView> pageViewSerde = new JsonSerdeV2<>(PageView.class);
-
-    OutputStream<EnrichedPageView> joinResultStream = graph.getOutputStream(
-        OUTPUT_TOPIC, new JsonSerdeV2<>(EnrichedPageView.class));
-
-    Table profileTable = graph.getTable(new RocksDbTableDescriptor<String, Profile>("profile-table")
-        .withSerde(KVSerde.of(new StringSerde(), profileSerde)));
-
-    graph.getInputStream(PROFILE_TOPIC, profileSerde)
-        .map(profile -> KV.of(profile.userId, profile))
-        .sendTo(profileTable);
-
-    graph.getInputStream(PAGEVIEW_TOPIC, pageViewSerde)
-        .partitionBy(pv -> pv.userId, pv -> pv, new KVSerde(new StringSerde(), pageViewSerde), "join")
-        .join(profileTable, new JoinFn())
-        .sendTo(joinResultStream);
-  }
-
-  private class JoinFn implements StreamTableJoinFunction<String, KV<String, PageView>, KV<String, Profile>, EnrichedPageView> {
-    @Override
-    public EnrichedPageView apply(KV<String, PageView> message, KV<String, Profile> record) {
-      return record == null ? null :
-          new EnrichedPageView(message.getKey(), record.getValue().company, message.getValue().pageId);
-    }
-    @Override
-    public String getMessageKey(KV<String, PageView> message) {
-      return message.getKey();
-    }
-    @Override
-    public String getRecordKey(KV<String, Profile> record) {
-      return record.getKey();
-    }
-  }
-
-  static public class EnrichedPageView {
-
-    public final String userId;
-    public final String company;
-    public final String pageId;
-
-    public EnrichedPageView(String userId, String company, String pageId) {
-      this.userId = userId;
-      this.company = company;
-      this.pageId = pageId;
-    }
-  }
-
-}
diff --git a/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java
new file mode 100644
index 0000000..4fadf78
--- /dev/null
+++ b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.kinesis;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.kinesis.consumer.KinesisIncomingMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A sample task which consumes messages from kinesis stream and logs the message content.
+ */
+public class KinesisHelloSamza implements StreamTask {
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisHelloSamza.class);
+
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+    KinesisIncomingMessageEnvelope kEnvelope = (KinesisIncomingMessageEnvelope) envelope;
+    long lagMs = System.currentTimeMillis() - kEnvelope.getApproximateArrivalTimestamp().getTime();
+    LOG.info(String.format("Kinesis message key: %s Lag: %d ms", envelope.getKey(), lagMs));
+  }
+}
\ No newline at end of file