Add a Kafka word count example
diff --git a/pom.xml b/pom.xml
index 49e4f76..2db2df0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,15 +156,20 @@
     <!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+      <artifactId>beam-sdks-java-io-kafka</artifactId>
       <version>${beam.version}</version>
+      <exclusions>
+        <exclusion>  <!-- exclude kafka-clients -->
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka-clients</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-samza</artifactId>
       <version>${beam.version}</version>
-      <scope>runtime</scope>
     </dependency>
 
     <dependency>
diff --git a/src/main/java/org/apache/beam/examples/KafkaWordCount.java b/src/main/java/org/apache/beam/examples/KafkaWordCount.java
new file mode 100644
index 0000000..a4e5001
--- /dev/null
+++ b/src/main/java/org/apache/beam/examples/KafkaWordCount.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+import java.util.Arrays;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.FlatMapElements;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.joda.time.Duration;
+
+/**
+ * An example that counts words from Kafka stream
+ *
+ * <pre>
+ *   1. Reading data from a Kafka topic
+ *   2. Specifying 'inline' transforms
+ *   3. Assign a window
+ *   4. Counting items in a PCollection
+ *   5. Writing data to an output kakfa topic
+ * </pre>
+ *
+ * <p>Create the input topic before running:
+ * <pre>{@code
+ * $ ./deploy/kafka/bin/kafka-topics.sh  --zookeeper localhost:2181 --create --topic input-text --partitions 10 --replication-factor 1
+ * }</pre>
+ *
+ * <p>To run locally:
+ * <pre>{@code
+ * $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.KafkaWordCount \
+ * -Dexec.args="--runner=SamzaRunner" -P samza-runner
+ * }</pre>
+ *
+ * <p>To execute the example in distributed manner, use mvn to package it first:
+ * <pre>{@code
+ * $ mkdir -p deploy/examples
+ * $ mvn package && tar -xvf target/samza-beam-examples-0.1-dist.tar.gz -C deploy/examples/
+ * }</pre>
+ *
+ * <p>TO run in standalone with zookeeper:
+ * <pre>{@code
+ * $ deploy/examples/bin/run-beam-standalone.sh org.apache.beam.examples.KafkaWordCount --configFilePath=$PWD/deploy/examples/config/word-count-standalone.properties --maxSourceParallelism=1024
+ * }</pre>
+ *
+ * <p>To produce some test data:
+ * <pre>{@code
+ * $ ./deploy/kafka/bin/kafka-console-producer.sh --topic input-text --broker-list localhost:9092 <br/>
+ * Nory was a Catholic because her mother was a Catholic, and Nory’s mother was a Catholic because her father was a Catholic, and her father was a Catholic because his mother was a Catholic, or had been. </br>
+ * }</pre>
+ *
+ * <p>To verify output:
+ * <pre>{@code
+ * $ ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic word-count --property print.key=true
+ * }</pre>
+ *
+ */
+public class KafkaWordCount {
+
+  public static void main(String[] args) {
+    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+    options.setJobName("kafka-word-count");
+
+    // Create the Pipeline object with the options we defined above
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(
+        KafkaIO.<String, String>read()
+            .withBootstrapServers("localhost:9092")
+            .withTopic("input-text")
+            .withKeyDeserializer(StringDeserializer.class)
+            .withValueDeserializer(StringDeserializer.class)
+            .withoutMetadata())
+        .apply(Values.create())
+        // Apply a FlatMapElements transform the PCollection of text lines.
+        // This transform splits the lines in PCollection<String>, where each element is an
+        // individual word in Shakespeare's collected texts.
+        .apply(
+            FlatMapElements.into(TypeDescriptors.strings())
+                .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
+        // We use a Filter transform to avoid empty word
+        .apply(Filter.by((String word) -> !word.isEmpty()))
+        .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
+        // Apply the Count transform to our PCollection of individual words. The Count
+        // transform returns a new PCollection of key/value pairs, where each key represents a
+        // unique word in the text. The associated value is the occurrence count for that word.
+        .apply(Count.perElement())
+        .apply(MapElements
+            .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
+            .via(kv -> KV.of(kv.getKey(), String.valueOf(kv.getValue()))))
+        .apply(KafkaIO.<String, String>write()
+            .withBootstrapServers("localhost:9092")
+            .withTopic("word-count")
+            .withKeySerializer(StringSerializer.class)
+            .withValueSerializer(StringSerializer.class));
+
+    p.run().waitUntilFinish();
+  }
+}
diff --git a/src/main/java/org/apache/beam/examples/WordCount.java b/src/main/java/org/apache/beam/examples/WordCount.java
index 8ba229a..96751ae 100644
--- a/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/src/main/java/org/apache/beam/examples/WordCount.java
@@ -55,7 +55,6 @@
  * </pre>
  *
  * <p>To execute this example locally:
- *
  * <pre>{@code
  * $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
  *      -Dexec.args="--inputFile=pom.xml --output=counts --runner=SamzaRunner" -Psamza-runner