blob: ffd2a03f96884af9f850391f09abd737abb1a959 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.joda.time.Duration;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.twitter.TwitterSampleInput;
import com.datatorrent.lib.util.KeyValPair;
import static;
* Auto Complete Hashtag Demo with real time twitter input. In order to run this application, you need to create an app
* at, then generate your consumer and access keys and tokens, and enter those information
* accordingly in /resources/META-INF/properties.xml.
* The authentication requires following 4 information.
* Your application consumer key,
* Your application consumer secret,
* Your twitter access token, and
* Your twitter access token secret.
* @since 3.5.0
@ApplicationAnnotation(name = "TwitterAutoComplete")
public class TwitterAutoComplete implements StreamingApplication
* Check whether every character in a string is ASCII encoding.
public static class StringUtils
static CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder();
public static boolean isAscii(String v)
return encoder.canEncode(v);
* FlapMap Function to extract all hashtags from a text form tweet.
private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
public Iterable<String> f(String input)
List<String> result = new LinkedList<>();
Matcher m = Pattern.compile("#\\S+").matcher(input);
while (m.find()) {
return result;
* Lower latency, but more expensive.
private static class ComputeTopFlat
extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
private final int candidatesPerPrefix;
private final int minPrefix;
public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
this.candidatesPerPrefix = candidatesPerPrefix;
this.minPrefix = minPrefix;
public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
WindowedStream<CompletionCandidate> input)
TopNByKey topNByKey = new TopNByKey();
return input
.<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix, 3), name("Extract Prefixes"))
.accumulateByKey(topNByKey, new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, CompletionCandidate>()
public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
// TODO: Should be removed after Auto-wrapping is supported.
return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple);
}, name("TopNByKey"));
* FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
* KeyValPairs of the prefix and the CompletionCandidate
private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
private final int minPrefix;
private final int maxPrefix;
public AllPrefixes()
this(0, Integer.MAX_VALUE);
public AllPrefixes(int minPrefix)
this(minPrefix, Integer.MAX_VALUE);
public AllPrefixes(int minPrefix, int maxPrefix)
this.minPrefix = minPrefix;
this.maxPrefix = maxPrefix;
public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
String word = input.getValue();
for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
return result;
* A Composite stream transform that takes as input a list of tokens and returns
* the most common tokens per prefix.
public static class ComputeTopCompletions
extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
private final int candidatesPerPrefix;
private final boolean recursive;
protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
this.candidatesPerPrefix = candidatesPerPrefix;
this.recursive = recursive;
public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
return new ComputeTopCompletions(candidatesPerPrefix, recursive);
public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
ApexStream<CompletionCandidate> candidates = inputStream
.countByKey(new Function.ToKeyValue<String, String, Long>()
public Tuple<KeyValPair<String, Long>> f(String input)
return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
}, name("Hashtag Count"))
.map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
}, name("KeyValPair to CompletionCandidate"));
return candidates.addCompositeStreams(new ComputeTopFlat(candidatesPerPrefix, 1));
* FilterFunction to filter out tweets with non-acsii characters.
static class ASCIIFilter implements Function.FilterFunction<String>
public boolean f(String input)
return StringUtils.isAscii(input);
* Populate the dag with High-Level API.
* @param dag
* @param conf
public void populateDAG(DAG dag, Configuration conf)
TwitterSampleInput input = new TwitterSampleInput();
WindowOption windowOption = new WindowOption.GlobalWindow();
ApexStream<String> tags = StreamFactory.fromInput(input, input.text, name("tweetSampler"))
.filter(new ASCIIFilter(), name("ACSII Filter"))
.flatMap(new ExtractHashtags(), name("Extract Hashtags"));
ApexStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> s =
tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(10)))
.addCompositeStreams(, true)).print();