blob: e76096ef78d1fd5f47acf6219dbeed1f6d6646e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.apex.examples;
import com.google.common.collect.Sets;
import java.io.File;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.ApexRunnerResult;
import org.apache.beam.runners.apex.TestApexRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.io.FileUtils;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;
/**
* Windowed word count example on Apex runner.
*/
public class WordCountTest {
static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String row = c.element().getKey() + " - " + c.element().getValue()
+ " @ " + c.timestamp().toString();
c.output(row);
}
}
static class ExtractWordsFn extends DoFn<String, String> {
private static final long serialVersionUID = 1L;
private final Counter emptyLines = Metrics.counter("main", "emptyLines");
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.inc(1);
}
// Split the line into words.
String[] words = c.element().split("[^a-zA-Z']+");
// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
/**
* Options for word count example.
*/
public interface WordCountOptions extends ApexPipelineOptions {
@Description("Path of the file to read from")
@Validation.Required
String getInputFile();
void setInputFile(String value);
@Description("Path of the file to write to")
@Validation.Required
String getOutput();
void setOutput(String value);
}
public static void main(String[] args) {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WordCountOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Count.<String>perElement())
.apply(ParDo.of(new FormatAsStringFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()))
;
p.run().waitUntilFinish();
}
@Test
public void testWordCountExample() throws Exception {
PipelineOptionsFactory.register(WordCountOptions.class);
WordCountOptions options = TestPipeline.testingPipelineOptions().as(WordCountOptions.class);
options.setRunner(TestApexRunner.class);
options.setApplicationName("StreamingWordCount");
String inputFile = WordCountTest.class.getResource("/words.txt").getFile();
options.setInputFile(new File(inputFile).getAbsolutePath());
String outputFilePrefix = "target/wordcountresult.txt";
options.setOutput(outputFilePrefix);
WordCountTest.main(TestPipeline.convertToArgs(options));
File outFile1 = new File(outputFilePrefix + "-00000-of-00002");
File outFile2 = new File(outputFilePrefix + "-00001-of-00002");
Assert.assertTrue(outFile1.exists() && outFile2.exists());
HashSet<String> results = new HashSet<>();
results.addAll(FileUtils.readLines(outFile1));
results.addAll(FileUtils.readLines(outFile2));
HashSet<String> expectedOutput = Sets.newHashSet(
"foo - 5 @ 294247-01-09T04:00:54.775Z",
"bar - 5 @ 294247-01-09T04:00:54.775Z"
);
Assert.assertEquals("expected output", expectedOutput, results);
}
static class CollectResultsFn extends DoFn<KV<String, Long>, String> {
static final ConcurrentHashMap<String, Long> RESULTS = new ConcurrentHashMap<>();
@ProcessElement
public void processElement(ProcessContext c) {
RESULTS.put(c.element().getKey(), c.element().getValue());
}
}
@Test
public void testWindowedWordCount() throws Exception {
String[] args = new String[] {
"--runner=" + ApexRunner.class.getName()
};
ApexPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(ApexPipelineOptions.class);
options.setApplicationName("StreamingWordCount");
Pipeline p = Pipeline.create(options);
PCollection<KV<String, Long>> wordCounts =
p.apply(Read.from(new UnboundedTextSource()))
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
.apply(Count.<String>perElement());
wordCounts.apply(ParDo.of(new CollectResultsFn()));
ApexRunnerResult result = (ApexRunnerResult) p.run();
Assert.assertNotNull(result.getApexDAG().getOperatorMeta("Read(UnboundedTextSource)"));
long timeout = System.currentTimeMillis() + 30000;
while (System.currentTimeMillis() < timeout) {
if (CollectResultsFn.RESULTS.containsKey("foo")
&& CollectResultsFn.RESULTS.containsKey("bar")) {
break;
}
result.waitUntilFinish(Duration.millis(1000));
}
result.cancel();
Assert.assertTrue(
CollectResultsFn.RESULTS.containsKey("foo") && CollectResultsFn.RESULTS.containsKey("bar"));
CollectResultsFn.RESULTS.clear();
}
}