| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.flink.examples; |
| |
| import org.apache.beam.runners.flink.FlinkPipelineOptions; |
| import org.apache.beam.runners.flink.FlinkRunner; |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.io.TextIO; |
| import org.apache.beam.sdk.options.Description; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.options.Validation; |
| import org.apache.beam.sdk.transforms.Aggregator; |
| import org.apache.beam.sdk.transforms.Count; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.MapElements; |
| import org.apache.beam.sdk.transforms.PTransform; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.transforms.SimpleFunction; |
| import org.apache.beam.sdk.transforms.Sum; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollection; |
| |
| /** |
| * Wordcount pipeline. |
| */ |
| public class WordCount { |
| |
| /** |
| * Function to extract words. |
| */ |
| public static class ExtractWordsFn extends DoFn<String, String> { |
| private final Aggregator<Long, Long> emptyLines = |
| createAggregator("emptyLines", new Sum.SumLongFn()); |
| |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| if (c.element().trim().isEmpty()) { |
| emptyLines.addValue(1L); |
| } |
| |
| // Split the line into words. |
| String[] words = c.element().split("[^a-zA-Z']+"); |
| |
| // Output each word encountered into the output PCollection. |
| for (String word : words) { |
| if (!word.isEmpty()) { |
| c.output(word); |
| } |
| } |
| } |
| } |
| |
| /** |
| * PTransform counting words. |
| */ |
| public static class CountWords extends PTransform<PCollection<String>, |
| PCollection<KV<String, Long>>> { |
| @Override |
| public PCollection<KV<String, Long>> expand(PCollection<String> lines) { |
| |
| // Convert lines of text into individual words. |
| PCollection<String> words = lines.apply( |
| ParDo.of(new ExtractWordsFn())); |
| |
| // Count the number of times each word occurs. |
| PCollection<KV<String, Long>> wordCounts = |
| words.apply(Count.<String>perElement()); |
| |
| return wordCounts; |
| } |
| } |
| |
| /** A SimpleFunction that converts a Word and Count into a printable string. */ |
| public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> { |
| @Override |
| public String apply(KV<String, Long> input) { |
| return input.getKey() + ": " + input.getValue(); |
| } |
| } |
| |
| /** |
| * Options supported by {@link WordCount}. |
| * |
| * <p>Inherits standard configuration options. |
| */ |
| public interface Options extends PipelineOptions, FlinkPipelineOptions { |
| @Description("Path of the file to read from") |
| String getInput(); |
| void setInput(String value); |
| |
| @Description("Path of the file to write to") |
| @Validation.Required |
| String getOutput(); |
| void setOutput(String value); |
| } |
| |
| public static void main(String[] args) { |
| |
| Options options = PipelineOptionsFactory.fromArgs(args).withValidation() |
| .as(Options.class); |
| options.setRunner(FlinkRunner.class); |
| |
| Pipeline p = Pipeline.create(options); |
| |
| p.apply("ReadLines", TextIO.Read.from(options.getInput())) |
| .apply(new CountWords()) |
| .apply(MapElements.via(new FormatAsTextFn())) |
| .apply("WriteCounts", TextIO.Write.to(options.getOutput())); |
| |
| p.run(); |
| } |
| |
| } |