blob: 7d1658dd2a4f577e24271e6ddc408e6fe3aeef92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
/**
* An example that counts words from Kafka stream
*
* <pre>
* 1. Reading data from a Kafka topic
* 2. Specifying 'inline' transforms
* 3. Assign a window
* 4. Counting items in a PCollection
* 5. Writing data to an output kakfa topic
* </pre>
*
* <p>Create the input topic before running:
* <pre>{@code
* $ ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic input-text --partitions 10 --replication-factor 1
* }</pre>
*
* <p>To run locally:
* <pre>{@code
* $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.KafkaWordCount \
* -Dexec.args="--runner=SamzaRunner" -P samza-runner
* }</pre>
*
* <p>To execute the example in distributed manner, use mvn to package it first:
* (remove .waitUntilFinish() in the code for yarn deployment)
* <pre>{@code
* $ mkdir -p deploy/examples
* $ mvn package && tar -xvf target/samza-beam-examples-0.1-dist.tar.gz -C deploy/examples/
* }</pre>
*
* <p>To run in standalone with zookeeper:
* (large parallelism will enforce each partition in a task)
* <pre>{@code
* $ deploy/examples/bin/run-beam-standalone.sh org.apache.beam.examples.KafkaWordCount --configFilePath=$PWD/deploy/examples/config/standalone.properties --maxSourceParallelism=1024
* }</pre>
*
* <p>To run in yarn:
* (large parallelism will enforce each partition in a task)
* <pre>{@code
* $ deploy/examples/bin/run-beam-yarn.sh org.apache.beam.examples.KafkaWordCount --configFilePath=$PWD/deploy/examples/config/yarn.properties --maxSourceParallelism=1024
* }</pre>
*
* <p>To produce some test data:
* <pre>{@code
* $ ./deploy/kafka/bin/kafka-console-producer.sh --topic input-text --broker-list localhost:9092 <br/>
* Nory was a Catholic because her mother was a Catholic, and Nory’s mother was a Catholic because her father was a Catholic, and her father was a Catholic because his mother was a Catholic, or had been. </br>
* }</pre>
*
* <p>To verify output:
* <pre>{@code
* $ ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic word-count --property print.key=true
* }</pre>
*
*/
public class KafkaWordCount {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName("kafka-word-count");
// Create the Pipeline object with the options we defined above
Pipeline p = Pipeline.create(options);
p.apply(
KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("input-text")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata())
.apply(Values.create())
// Apply a FlatMapElements transform the PCollection of text lines.
// This transform splits the lines in PCollection<String>, where each element is an
// individual word in Shakespeare's collected texts.
.apply(
FlatMapElements.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
// We use a Filter transform to avoid empty word
.apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
// Apply the Count transform to our PCollection of individual words. The Count
// transform returns a new PCollection of key/value pairs, where each key represents a
// unique word in the text. The associated value is the occurrence count for that word.
.apply(Count.perElement())
.apply(MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(kv -> KV.of(kv.getKey(), String.valueOf(kv.getValue()))))
.apply(KafkaIO.<String, String>write()
.withBootstrapServers("localhost:9092")
.withTopic("word-count")
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class));
//For yarn, we don't need to wait after submitting the job,
//so there is no need for waitUntilFinish(). Please use
//p.run()
p.run().waitUntilFinish();
}
}