blob: a5cddfd005e7262fba75610cdb89be437d9547b1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import java.util.Properties;
/**
* Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
* that computes a simple word occurrence histogram from an input text.
*
* In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
* represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
* is an updated count of a single word.
*
* Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
* and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
*/
public class WordCountProcessorDemo {
private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
@Override
public Processor<String, String> get() {
return new Processor<String, String>() {
private ProcessorContext context;
private KeyValueStore<String, Integer> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(1000);
this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
}
@Override
public void process(String dummy, String line) {
String[] words = line.toLowerCase().split(" ");
for (String word : words) {
Integer oldValue = this.kvStore.get(word);
if (oldValue == null) {
this.kvStore.put(word, 1);
} else {
this.kvStore.put(word, oldValue + 1);
}
}
context.commit();
}
@Override
public void punctuate(long timestamp) {
KeyValueIterator<String, Integer> iter = this.kvStore.all();
System.out.println("----------- " + timestamp + " ----------- ");
while (iter.hasNext()) {
KeyValue<String, Integer> entry = iter.next();
System.out.println("[" + entry.key + ", " + entry.value + "]");
context.forward(entry.key, entry.value.toString());
}
iter.close();
}
@Override
public void close() {
this.kvStore.close();
}
};
}
}
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("Source", "streams-file-input");
builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the input data is finite.
Thread.sleep(5000L);
streams.close();
}
}