Merge pull request #50 from apache/latest

diff --git a/.reviewboardrc b/.reviewboardrc
index a989cb2..dc119ba 100644
--- a/.reviewboardrc
+++ b/.reviewboardrc
@@ -19,4 +19,4 @@
 REPOSITORY = 'samza-hello-samza'
 GUESS_DESCRIPTION = True
 TARGET_GROUPS = 'samza'
-TRACKING_BRANCH = 'origin/latest'
+TRACKING_BRANCH = 'origin/master'
diff --git a/bin/deploy.sh b/bin/deploy.sh
index 3c3ada2..d6ce59c 100755
--- a/bin/deploy.sh
+++ b/bin/deploy.sh
@@ -23,4 +23,4 @@
 
 mvn clean package
 mkdir -p $base_dir/deploy/samza
-tar -xvf $base_dir/target/hello-samza-1.0.0-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza
+tar -xvf $base_dir/target/hello-samza-1.0.0-dist.tar.gz -C $base_dir/deploy/samza
diff --git a/bin/grid b/bin/grid
index 638fd48..3cd0d30 100755
--- a/bin/grid
+++ b/bin/grid
@@ -35,7 +35,7 @@
 COMMAND=$1
 SYSTEM=$2
 
-DOWNLOAD_KAFKA=https://archive.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
+DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz
 DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
 DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
 
@@ -79,7 +79,6 @@
 }
 
 install_samza() {
-  echo "Building samza from master..."
   mkdir -p "$DEPLOY_ROOT_DIR"
   if [ -d "$DOWNLOAD_CACHE_DIR/samza/.git" ]; then
     pushd "$DOWNLOAD_CACHE_DIR/samza"
@@ -113,7 +112,7 @@
 
 install_kafka() {
   mkdir -p "$DEPLOY_ROOT_DIR"
-  install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.1.1
+  install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.2.1
   # have to use SIGTERM since nohup on appears to ignore SIGINT
   # and Kafka switched to SIGINT in KAFKA-1031.
   sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh
diff --git a/build.gradle b/build.gradle
index 80dafea..143995d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -56,6 +56,7 @@
     compile(group: 'org.schwering', name: 'irclib', version: '1.10')
     compile(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION")
+
     compile(group: 'org.apache.samza', name: 'samza-test_2.11', version: "$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-kafka_2.11', version: "$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: "$SAMZA_VERSION")
@@ -64,6 +65,7 @@
     explode (group: 'org.apache.samza', name: 'samza-shell',  ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION")
     runtime(group: 'org.apache.samza', name: 'samza-core_2.11', version: "$SAMZA_VERSION")
     runtime(group: 'org.apache.samza', name: 'samza-log4j', version: "$SAMZA_VERSION")
+
     runtime(group: 'org.apache.samza', name: 'samza-shell', version: "$SAMZA_VERSION")
     runtime(group: 'org.apache.samza', name: 'samza-yarn_2.11', version: "$SAMZA_VERSION")
     runtime(group: 'org.apache.kafka', name: 'kafka_2.11', version: "$KAFKA_VERSION")
diff --git a/conf/yarn-site.xml b/conf/yarn-site.xml
index 9028590..6c48260 100644
--- a/conf/yarn-site.xml
+++ b/conf/yarn-site.xml
@@ -30,4 +30,8 @@
     <name>yarn.resourcemanager.hostname</name>
     <value>127.0.0.1</value>
   </property>
+  <property>
+    <name>yarn.nodemanager.delete.debug-delay.sec</name>
+    <value>86400</value>
+  </property>
 </configuration>
diff --git a/pom.xml b/pom.xml
index a556d96..3974880 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
 
   <groupId>org.apache.samza</groupId>
   <artifactId>hello-samza</artifactId>
-  <version>1.0.0-SNAPSHOT</version>
+  <version>1.0.0</version>
   <packaging>jar</packaging>
   <name>Samza Example</name>
   <description>
@@ -132,6 +132,11 @@
       <version>${samza.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-aws</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.11</artifactId>
       <version>0.11.0.2</version>
diff --git a/src/main/config/kinesis-hello-samza.properties b/src/main/config/kinesis-hello-samza.properties
new file mode 100644
index 0000000..17203df
--- /dev/null
+++ b/src/main/config/kinesis-hello-samza.properties
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=kinesis-hello-samza
+
+job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+yarn.container.count=2
+
+# Task
+task.class=samza.examples.kinesis.KinesisHelloSamza
+# Please replace the below input stream with the stream you plan to consume from.
+task.inputs=kinesis.kinesis-samza-sample-stream
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+# Kinesis System
+systems.kinesis.samza.factory=org.apache.samza.system.kinesis.KinesisSystemFactory
+# Please replace the below with the region of your Kinesis data stream.
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.region=us-west-1
+# Access key below is a dummy key for instructional purposes. Please replace with your own key.
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.accessKey=AKIAIHSMRK3Q72O8TEXQ
+# Secret key below is a dummy key for instructional purposes. Please replace with your own key.
+sensitive.systems.kinesis.streams.kinesis-samza-sample-stream.aws.secretKey=9GuEqdY+gNXXGrOQyev8XKziY+sRB1ht91jloEyP
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.kcl.TableName=kinesis-hello-samza
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=json
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
diff --git a/src/main/java/samza/examples/avro/AvroSerDeFactory.java b/src/main/java/samza/examples/avro/AvroSerDeFactory.java
new file mode 100644
index 0000000..c96eedd
--- /dev/null
+++ b/src/main/java/samza/examples/avro/AvroSerDeFactory.java
@@ -0,0 +1,73 @@
+package samza.examples.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+
+
+public class AvroSerDeFactory implements SerdeFactory {
+
+  public static String CFG_AVRO_SCHEMA = "serializers.avro.schema";
+
+  @Override
+  public Serde getSerde(String name, Config config) {
+    return new AvroSerDe(config);
+  }
+
+  private class AvroSerDe implements Serde {
+    private final Schema schema;
+
+    public AvroSerDe(Config config) {
+      schema = Schema.parse(config.get(CFG_AVRO_SCHEMA));
+    }
+
+    @Override
+    public Object fromBytes(byte[] bytes) {
+      GenericRecord record;
+      try {
+        record = genericRecordFromBytes(bytes, schema);
+      } catch (IOException e) {
+        throw new SamzaException("Unable to deserialize the record", e);
+      }
+      return record;
+    }
+
+    @Override
+    public byte[] toBytes(Object o) {
+      GenericRecord record = (GenericRecord) o;
+      try {
+        return encodeAvroGenericRecord(schema, record);
+      } catch (IOException e) {
+        throw new SamzaException("Unable to serialize the record", e);
+      }
+    }
+  }
+
+  public byte[] encodeAvroGenericRecord(Schema schema, GenericRecord record) throws IOException {
+    DatumWriter<IndexedRecord> msgDatumWriter = new GenericDatumWriter<>(schema);
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    Encoder encoder = EncoderFactory.get().binaryEncoder(os, null);
+    msgDatumWriter.write(record, encoder);
+    encoder.flush();
+    return os.toByteArray();
+  }
+
+  private static <T> T genericRecordFromBytes(byte[] bytes, Schema schema) throws IOException {
+    BinaryDecoder binDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null);
+    GenericDatumReader<T> reader = new GenericDatumReader<>(schema);
+    return reader.read(null, binDecoder);
+  }
+}
diff --git a/src/main/java/samza/examples/cookbook/AdClick.java b/src/main/java/samza/examples/cookbook/AdClick.java
new file mode 100644
index 0000000..2d15cec
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/AdClick.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.cookbook;
+
+/**
+ * Represents an ad click event.
+ */
+public class AdClick {
+  /*
+   * An unique identifier for the ad
+   */
+  private final String adId;
+  /**
+   * The user that clicked the ad
+   */
+  private final String userId;
+  /**
+   * The id of the page that the ad was served from
+   */
+  private final String pageId;
+
+  public AdClick(String message) {
+    String[] adClickFields = message.split(",");
+    this.adId = adClickFields[0];
+    this.userId = adClickFields[1];
+    this.pageId = adClickFields[2];
+  }
+
+  public String getAdId() {
+    return adId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getPageId() {
+    return pageId;
+  }
+
+}
\ No newline at end of file
diff --git a/src/main/java/samza/examples/cookbook/PageView.java b/src/main/java/samza/examples/cookbook/PageView.java
new file mode 100644
index 0000000..7803db7
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageView.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+/**
+ * Represents a Page view event
+ */
+class PageView {
+  /**
+   * The user that viewed the page
+   */
+  private final String userId;
+  /**
+   * The region that the page was viewed from
+   */
+  private final String country;
+  /**
+   * A trackingId for the page
+   */
+  private final String pageId;
+
+  /**
+   * Constructs a {@link PageView} from the provided string.
+   *
+   * @param message in the following CSV format - userId,country,url
+   */
+  PageView(String message) {
+    String[] pageViewFields = message.split(",");
+    userId = pageViewFields[0];
+    country = pageViewFields[1];
+    pageId = pageViewFields[2];
+  }
+
+  String getUserId() {
+    return userId;
+  }
+
+  String getCountry() {
+    return country;
+  }
+
+  String getPageId() {
+    return pageId;
+  }
+}
diff --git a/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java
new file mode 100644
index 0000000..4fadf78
--- /dev/null
+++ b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.kinesis;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.kinesis.consumer.KinesisIncomingMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A sample task which consumes messages from kinesis stream and logs the message content.
+ */
+public class KinesisHelloSamza implements StreamTask {
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisHelloSamza.class);
+
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+    KinesisIncomingMessageEnvelope kEnvelope = (KinesisIncomingMessageEnvelope) envelope;
+    long lagMs = System.currentTimeMillis() - kEnvelope.getApproximateArrivalTimestamp().getTime();
+    LOG.info(String.format("Kinesis message key: %s Lag: %d ms", envelope.getKey(), lagMs));
+  }
+}
\ No newline at end of file
diff --git a/src/main/java/samza/examples/sql/samza-sql-casewhen/src/main/sql/samza.sql b/src/main/java/samza/examples/sql/samza-sql-casewhen/src/main/sql/samza.sql
new file mode 100644
index 0000000..ca9f854
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-casewhen/src/main/sql/samza.sql
@@ -0,0 +1,12 @@
+-- For each profile in Kafka Profile change capture stream, identify whether the
+-- profile is a quality profile or not and insert the result into QualityProfile
+-- kafka topic. Please note the usage of GetSqlField UDF to extract the company
+-- name field from nested record.
+
+INSERT INTO kafka.QualityProfile
+SELECT id, status, case when (profilePicture <> null and industryName <> null and
+GetSqlField(positions, 'Position.companyName') <> null)
+then 1 else 0 end as quality
+FROM kafka.ProfileChanges
+
+-- you can add additional SQL statements here
diff --git a/src/main/java/samza/examples/sql/samza-sql-filter/src/main/sql/samza.sql b/src/main/java/samza/examples/sql/samza-sql-filter/src/main/sql/samza.sql
new file mode 100644
index 0000000..9e7960c
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-filter/src/main/sql/samza.sql
@@ -0,0 +1,9 @@
+-- Filter Profile change-capture stream by 'Product Manager'
+-- title and project basic profile data to a kafka topic.
+
+INSERT INTO kafka.ProductManagerProfiles
+SELECT memberId, firstName, lastName, company
+FROM kafka.ProfileChanges
+WHERE standardize(title) = 'Product Manager'
+
+-- you can add additional SQL statements here
diff --git a/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/java/samza/sql/PageViewGroupByOutput.json b/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/java/samza/sql/PageViewGroupByOutput.json
new file mode 100644
index 0000000..11680d1
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/java/samza/sql/PageViewGroupByOutput.json
@@ -0,0 +1,18 @@
+{
+    "name": "PageViewGroupByOutput",
+    "version" : 1,
+    "namespace": "org.apache.samza.sql.system.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name":"pageKey",
+            "doc":"The page key of the page being viewed.",
+            "type":["string","null"]
+        },
+        {
+            "name": "Views",
+            "doc" : "Number of views in 5 minute window.",
+            "type": ["long", "null"]
+        }
+    ]
+}
diff --git a/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/sql/samza.sql b/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/sql/samza.sql
new file mode 100644
index 0000000..716bfaa
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/sql/samza.sql
@@ -0,0 +1,12 @@
+-- NOTE: Groupby Operator is currently not fully stable,
+--       we are actively working on stabilizing it.
+
+-- Emit Page view counts collected grouped by page key in the last
+-- 5 minutes at 5 minute interval and send the result to a kafka topic.
+-- Using GetSqlField UDF to extract page key from the requestHeader.
+insert into kafka.groupbyTopic
+  select GetSqlField(pv.requestHeader) as __key__, GetPageKey(pv.requestHeader) as pageKey, count(*) as Views
+  from kafka.`PageViewEvent` as pv
+  group by GetSqlField(pv.requestHeader)
+
+-- You can add additional SQL statements here
diff --git a/src/main/java/samza/examples/sql/samza-sql-stream-table-join/src/main/resources/log4j.xml b/src/main/java/samza/examples/sql/samza-sql-stream-table-join/src/main/resources/log4j.xml
new file mode 100644
index 0000000..8a80ddf
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-stream-table-join/src/main/resources/log4j.xml
@@ -0,0 +1,16 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  <appender name="RollingAppender" class="org.apache.log4j.RollingFileAppender">
+    <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
+    <param name="MaxFileSize" value="25MB" />
+    <param name="MaxBackupIndex" value="100" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
+    </layout>
+  </appender>
+  <root>
+    <priority value="info" />
+    <appender-ref ref="RollingAppender"/>
+  </root>
+</log4j:configuration>
\ No newline at end of file
diff --git a/src/main/java/samza/examples/sql/samza-sql-stream-table-join/src/main/sql/samza.sql b/src/main/java/samza/examples/sql/samza-sql-stream-table-join/src/main/sql/samza.sql
new file mode 100644
index 0000000..c1b9be0
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-stream-table-join/src/main/sql/samza.sql
@@ -0,0 +1,11 @@
+-- NOTE: Join Operator is currently not fully stable,
+--       we are actively working on stabilizing it.
+
+-- Enrich PageViewEvent with member profile data
+INSERT INTO kafka.tracking.EnrichedPageVIewEvent
+SELECT *
+FROM Kafka.PageViewEvent as pv
+  JOIN Kafka.ProfileChanges.`$table` as p
+    ON pv.memberid = p.memberid
+
+-- You can add additional SQL statements here