Merge pull request #50 from apache/latest
diff --git a/.reviewboardrc b/.reviewboardrc
index a989cb2..dc119ba 100644
--- a/.reviewboardrc
+++ b/.reviewboardrc
@@ -19,4 +19,4 @@
REPOSITORY = 'samza-hello-samza'
GUESS_DESCRIPTION = True
TARGET_GROUPS = 'samza'
-TRACKING_BRANCH = 'origin/latest'
+TRACKING_BRANCH = 'origin/master'
diff --git a/bin/deploy.sh b/bin/deploy.sh
index 3c3ada2..d6ce59c 100755
--- a/bin/deploy.sh
+++ b/bin/deploy.sh
@@ -23,4 +23,4 @@
mvn clean package
mkdir -p $base_dir/deploy/samza
-tar -xvf $base_dir/target/hello-samza-1.0.0-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza
+tar -xvf $base_dir/target/hello-samza-1.0.0-dist.tar.gz -C $base_dir/deploy/samza
diff --git a/bin/grid b/bin/grid
index 638fd48..3cd0d30 100755
--- a/bin/grid
+++ b/bin/grid
@@ -35,7 +35,7 @@
COMMAND=$1
SYSTEM=$2
-DOWNLOAD_KAFKA=https://archive.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
+DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz
DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz
DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
@@ -79,7 +79,6 @@
}
install_samza() {
- echo "Building samza from master..."
mkdir -p "$DEPLOY_ROOT_DIR"
if [ -d "$DOWNLOAD_CACHE_DIR/samza/.git" ]; then
pushd "$DOWNLOAD_CACHE_DIR/samza"
@@ -113,7 +112,7 @@
install_kafka() {
mkdir -p "$DEPLOY_ROOT_DIR"
- install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.1.1
+ install kafka $DOWNLOAD_KAFKA kafka_2.11-0.10.2.1
# have to use SIGTERM since nohup on appears to ignore SIGINT
# and Kafka switched to SIGINT in KAFKA-1031.
sed -i.bak 's/SIGINT/SIGTERM/g' $DEPLOY_ROOT_DIR/kafka/bin/kafka-server-stop.sh
diff --git a/build.gradle b/build.gradle
index 80dafea..143995d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -56,6 +56,7 @@
compile(group: 'org.schwering', name: 'irclib', version: '1.10')
compile(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION")
compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION")
+
compile(group: 'org.apache.samza', name: 'samza-test_2.11', version: "$SAMZA_VERSION")
compile(group: 'org.apache.samza', name: 'samza-kafka_2.11', version: "$SAMZA_VERSION")
compile(group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: "$SAMZA_VERSION")
@@ -64,6 +65,7 @@
explode (group: 'org.apache.samza', name: 'samza-shell', ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION")
runtime(group: 'org.apache.samza', name: 'samza-core_2.11', version: "$SAMZA_VERSION")
runtime(group: 'org.apache.samza', name: 'samza-log4j', version: "$SAMZA_VERSION")
+
runtime(group: 'org.apache.samza', name: 'samza-shell', version: "$SAMZA_VERSION")
runtime(group: 'org.apache.samza', name: 'samza-yarn_2.11', version: "$SAMZA_VERSION")
runtime(group: 'org.apache.kafka', name: 'kafka_2.11', version: "$KAFKA_VERSION")
diff --git a/conf/yarn-site.xml b/conf/yarn-site.xml
index 9028590..6c48260 100644
--- a/conf/yarn-site.xml
+++ b/conf/yarn-site.xml
@@ -30,4 +30,8 @@
<name>yarn.resourcemanager.hostname</name>
<value>127.0.0.1</value>
</property>
+ <property>
+ <name>yarn.nodemanager.delete.debug-delay.sec</name>
+ <value>86400</value>
+ </property>
</configuration>
diff --git a/pom.xml b/pom.xml
index a556d96..3974880 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
<groupId>org.apache.samza</groupId>
<artifactId>hello-samza</artifactId>
- <version>1.0.0-SNAPSHOT</version>
+ <version>1.0.0</version>
<packaging>jar</packaging>
<name>Samza Example</name>
<description>
@@ -132,6 +132,11 @@
<version>${samza.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.samza</groupId>
+ <artifactId>samza-aws</artifactId>
+ <version>${samza.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.2</version>
diff --git a/src/main/config/kinesis-hello-samza.properties b/src/main/config/kinesis-hello-samza.properties
new file mode 100644
index 0000000..17203df
--- /dev/null
+++ b/src/main/config/kinesis-hello-samza.properties
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=kinesis-hello-samza
+
+job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+yarn.container.count=2
+
+# Task
+task.class=samza.examples.kinesis.KinesisHelloSamza
+# Please replace the below input stream with the stream you plan to consume from.
+task.inputs=kinesis.kinesis-samza-sample-stream
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+# Kinesis System
+systems.kinesis.samza.factory=org.apache.samza.system.kinesis.KinesisSystemFactory
+# Please replace the below with the region of your Kinesis data stream.
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.region=us-west-1
+# Access key below is a dummy key for instructional purposes. Please replace with your own key.
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.accessKey=AKIAIHSMRK3Q72O8TEXQ
+# Secret key below is a dummy key for instructional purposes. Please replace with your own key.
+sensitive.systems.kinesis.streams.kinesis-samza-sample-stream.aws.secretKey=9GuEqdY+gNXXGrOQyev8XKziY+sRB1ht91jloEyP
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.kcl.TableName=kinesis-hello-samza
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=json
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
diff --git a/src/main/java/samza/examples/avro/AvroSerDeFactory.java b/src/main/java/samza/examples/avro/AvroSerDeFactory.java
new file mode 100644
index 0000000..c96eedd
--- /dev/null
+++ b/src/main/java/samza/examples/avro/AvroSerDeFactory.java
@@ -0,0 +1,73 @@
+package samza.examples.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+
+
+public class AvroSerDeFactory implements SerdeFactory {
+
+ public static String CFG_AVRO_SCHEMA = "serializers.avro.schema";
+
+ @Override
+ public Serde getSerde(String name, Config config) {
+ return new AvroSerDe(config);
+ }
+
+ private class AvroSerDe implements Serde {
+ private final Schema schema;
+
+ public AvroSerDe(Config config) {
+ schema = Schema.parse(config.get(CFG_AVRO_SCHEMA));
+ }
+
+ @Override
+ public Object fromBytes(byte[] bytes) {
+ GenericRecord record;
+ try {
+ record = genericRecordFromBytes(bytes, schema);
+ } catch (IOException e) {
+ throw new SamzaException("Unable to deserialize the record", e);
+ }
+ return record;
+ }
+
+ @Override
+ public byte[] toBytes(Object o) {
+ GenericRecord record = (GenericRecord) o;
+ try {
+ return encodeAvroGenericRecord(schema, record);
+ } catch (IOException e) {
+ throw new SamzaException("Unable to serialize the record", e);
+ }
+ }
+ }
+
+ public byte[] encodeAvroGenericRecord(Schema schema, GenericRecord record) throws IOException {
+ DatumWriter<IndexedRecord> msgDatumWriter = new GenericDatumWriter<>(schema);
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ Encoder encoder = EncoderFactory.get().binaryEncoder(os, null);
+ msgDatumWriter.write(record, encoder);
+ encoder.flush();
+ return os.toByteArray();
+ }
+
+ private static <T> T genericRecordFromBytes(byte[] bytes, Schema schema) throws IOException {
+ BinaryDecoder binDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null);
+ GenericDatumReader<T> reader = new GenericDatumReader<>(schema);
+ return reader.read(null, binDecoder);
+ }
+}
diff --git a/src/main/java/samza/examples/cookbook/AdClick.java b/src/main/java/samza/examples/cookbook/AdClick.java
new file mode 100644
index 0000000..2d15cec
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/AdClick.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.cookbook;
+
+/**
+ * Represents an ad click event.
+ */
+public class AdClick {
+ /*
+ * An unique identifier for the ad
+ */
+ private final String adId;
+ /**
+ * The user that clicked the ad
+ */
+ private final String userId;
+ /**
+ * The id of the page that the ad was served from
+ */
+ private final String pageId;
+
+ public AdClick(String message) {
+ String[] adClickFields = message.split(",");
+ this.adId = adClickFields[0];
+ this.userId = adClickFields[1];
+ this.pageId = adClickFields[2];
+ }
+
+ public String getAdId() {
+ return adId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public String getPageId() {
+ return pageId;
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/samza/examples/cookbook/PageView.java b/src/main/java/samza/examples/cookbook/PageView.java
new file mode 100644
index 0000000..7803db7
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageView.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+/**
+ * Represents a Page view event
+ */
+class PageView {
+ /**
+ * The user that viewed the page
+ */
+ private final String userId;
+ /**
+ * The region that the page was viewed from
+ */
+ private final String country;
+ /**
+ * A trackingId for the page
+ */
+ private final String pageId;
+
+ /**
+ * Constructs a {@link PageView} from the provided string.
+ *
+ * @param message in the following CSV format - userId,country,url
+ */
+ PageView(String message) {
+ String[] pageViewFields = message.split(",");
+ userId = pageViewFields[0];
+ country = pageViewFields[1];
+ pageId = pageViewFields[2];
+ }
+
+ String getUserId() {
+ return userId;
+ }
+
+ String getCountry() {
+ return country;
+ }
+
+ String getPageId() {
+ return pageId;
+ }
+}
diff --git a/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java
new file mode 100644
index 0000000..4fadf78
--- /dev/null
+++ b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.kinesis;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.kinesis.consumer.KinesisIncomingMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A sample task which consumes messages from kinesis stream and logs the message content.
+ */
+public class KinesisHelloSamza implements StreamTask {
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisHelloSamza.class);
+
+ public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+ KinesisIncomingMessageEnvelope kEnvelope = (KinesisIncomingMessageEnvelope) envelope;
+ long lagMs = System.currentTimeMillis() - kEnvelope.getApproximateArrivalTimestamp().getTime();
+ LOG.info(String.format("Kinesis message key: %s Lag: %d ms", envelope.getKey(), lagMs));
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/samza/examples/sql/samza-sql-casewhen/src/main/sql/samza.sql b/src/main/java/samza/examples/sql/samza-sql-casewhen/src/main/sql/samza.sql
new file mode 100644
index 0000000..ca9f854
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-casewhen/src/main/sql/samza.sql
@@ -0,0 +1,12 @@
+-- For each profile in Kafka Profile change capture stream, identify whether the
+-- profile is a quality profile or not and insert the result into QualityProfile
+-- kafka topic. Please note the usage of GetSqlField UDF to extract the company
+-- name field from nested record.
+
+INSERT INTO kafka.QualityProfile
+SELECT id, status, case when (profilePicture <> null and industryName <> null and
+GetSqlField(positions, 'Position.companyName') <> null)
+then 1 else 0 end as quality
+FROM kafka.ProfileChanges
+
+-- you can add additional SQL statements here
diff --git a/src/main/java/samza/examples/sql/samza-sql-filter/src/main/sql/samza.sql b/src/main/java/samza/examples/sql/samza-sql-filter/src/main/sql/samza.sql
new file mode 100644
index 0000000..9e7960c
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-filter/src/main/sql/samza.sql
@@ -0,0 +1,9 @@
+-- Filter Profile change-capture stream by 'Product Manager'
+-- title and project basic profile data to a kafka topic.
+
+INSERT INTO kafka.ProductManagerProfiles
+SELECT memberId, firstName, lastName, company
+FROM kafka.ProfileChanges
+WHERE standardize(title) = 'Product Manager'
+
+-- you can add additional SQL statements here
diff --git a/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/java/samza/sql/PageViewGroupByOutput.json b/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/java/samza/sql/PageViewGroupByOutput.json
new file mode 100644
index 0000000..11680d1
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/java/samza/sql/PageViewGroupByOutput.json
@@ -0,0 +1,18 @@
+{
+ "name": "PageViewGroupByOutput",
+ "version" : 1,
+ "namespace": "org.apache.samza.sql.system.avro",
+ "type": "record",
+ "fields": [
+ {
+ "name":"pageKey",
+ "doc":"The page key of the page being viewed.",
+ "type":["string","null"]
+ },
+ {
+ "name": "Views",
+ "doc" : "Number of views in 5 minute window.",
+ "type": ["long", "null"]
+ }
+ ]
+}
diff --git a/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/sql/samza.sql b/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/sql/samza.sql
new file mode 100644
index 0000000..716bfaa
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/sql/samza.sql
@@ -0,0 +1,12 @@
+-- NOTE: Groupby Operator is currently not fully stable,
+-- we are actively working on stabilizing it.
+
+-- Emit Page view counts collected grouped by page key in the last
+-- 5 minutes at 5 minute interval and send the result to a kafka topic.
+-- Using GetSqlField UDF to extract page key from the requestHeader.
+insert into kafka.groupbyTopic
+ select GetSqlField(pv.requestHeader) as __key__, GetPageKey(pv.requestHeader) as pageKey, count(*) as Views
+ from kafka.`PageViewEvent` as pv
+ group by GetSqlField(pv.requestHeader)
+
+-- You can add additional SQL statements here
diff --git a/src/main/java/samza/examples/sql/samza-sql-stream-table-join/src/main/resources/log4j.xml b/src/main/java/samza/examples/sql/samza-sql-stream-table-join/src/main/resources/log4j.xml
new file mode 100644
index 0000000..8a80ddf
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-stream-table-join/src/main/resources/log4j.xml
@@ -0,0 +1,16 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="RollingAppender" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
+ <param name="MaxFileSize" value="25MB" />
+ <param name="MaxBackupIndex" value="100" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
+ </layout>
+ </appender>
+ <root>
+ <priority value="info" />
+ <appender-ref ref="RollingAppender"/>
+ </root>
+</log4j:configuration>
\ No newline at end of file
diff --git a/src/main/java/samza/examples/sql/samza-sql-stream-table-join/src/main/sql/samza.sql b/src/main/java/samza/examples/sql/samza-sql-stream-table-join/src/main/sql/samza.sql
new file mode 100644
index 0000000..c1b9be0
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-stream-table-join/src/main/sql/samza.sql
@@ -0,0 +1,11 @@
+-- NOTE: Join Operator is currently not fully stable,
+-- we are actively working on stabilizing it.
+
+-- Enrich PageViewEvent with member profile data
+INSERT INTO kafka.tracking.EnrichedPageVIewEvent
+SELECT *
+FROM Kafka.PageViewEvent as pv
+ JOIN Kafka.ProfileChanges.`$table` as p
+ ON pv.memberid = p.memberid
+
+-- You can add additional SQL statements here