blob: e9e0dad0686cdbd2bd88c9182334f5b64bf911de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.windowing;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.examples.wordcount.WordCount;
import org.apache.flink.streaming.examples.wordcount.util.CLI;
import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
import java.time.Duration;
/**
* Implements a windowed version of the streaming "WordCount" program.
*
* <p>The input is a plain text file with lines separated by newline characters.
*
* <p>Usage: <code>
* WordCount --input &lt;path&gt; --output &lt;path&gt; --window &lt;n&gt; --slide &lt;n&gt;</code>
* <br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
*
* <p>This example shows how to:
*
* <ul>
* <li>write a simple Flink Streaming program,
* <li>use tuple data types,
* <li>use basic windowing abstractions.
* </ul>
*/
public class WindowWordCount {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
final CLI params = CLI.fromArgs(args);
// Create the execution environment. This is the main entrypoint
// to building a Flink application.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Apache Flink’s unified approach to stream and batch processing means that a DataStream
// application executed over bounded input will produce the same final results regardless
// of the configured execution mode. It is important to note what final means here: a job
// executing in STREAMING mode might produce incremental updates (think upserts in
// a database) while a BATCH job would only produce one final result at the end. The final
// result will be the same if interpreted correctly, but getting there can be different.
//
// The “classic” execution behavior of the DataStream API is called STREAMING execution
// mode. Applications should use streaming execution for unbounded jobs that require
// continuous incremental processing and are expected to stay online indefinitely.
//
// By enabling BATCH execution, we allow Flink to apply additional optimizations that we
// can only do when we know that our input is bounded. For example, different
// join/aggregation strategies can be used, in addition to a different shuffle
// implementation that allows more efficient task scheduling and failure recovery behavior.
//
// By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
// are bounded and otherwise STREAMING.
env.setRuntimeMode(params.getExecutionMode());
// This optional step makes the input parameters
// available in the Flink UI.
env.getConfig().setGlobalJobParameters(params);
DataStream<String> text;
if (params.getInputs().isPresent()) {
// Create a new file source that will read files from a given set of directories.
// Each file will be processed as plain text and split based on newlines.
FileSource.FileSourceBuilder<String> builder =
FileSource.forRecordStreamFormat(
new TextLineInputFormat(), params.getInputs().get());
// If a discovery interval is provided, the source will
// continuously watch the given directories for new files.
params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
text = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input");
} else {
text = env.fromElements(WordCountData.WORDS).name("in-memory-input");
}
int windowSize = params.getInt("window").orElse(250);
int slideSize = params.getInt("slide").orElse(150);
DataStream<Tuple2<String, Integer>> counts =
// The text lines read from the source are split into words
// using a user-defined function. The tokenizer, implemented below,
// will output each words as a (2-tuple) containing (word, 1)
text.flatMap(new WordCount.Tokenizer())
.name("tokenizer")
// keyBy groups tuples based on the "0" field, the word.
// Using a keyBy allows performing aggregations and other
// stateful transformations over data on a per-key basis.
// This is similar to a GROUP BY clause in a SQL query.
.keyBy(value -> value.f0)
// create windows of windowSize records slided every slideSize records
.countWindow(windowSize, slideSize)
// For each key, we perform a simple sum of the "1" field, the count.
// If the input data set is bounded, sum will output a final count for
// each word. If it is unbounded, it will continuously output updates
// each time it sees a new instance of each word in the stream.
.sum(1)
.name("counter");
if (params.getOutput().isPresent()) {
// Given an output directory, Flink will write the results to a file
// using a simple string encoding. In a production environment, this might
// be something more structured like CSV, Avro, JSON, or Parquet.
counts.sinkTo(
FileSink.<Tuple2<String, Integer>>forRowFormat(
params.getOutput().get(), new SimpleStringEncoder<>())
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(MemorySize.ofMebiBytes(1))
.withRolloverInterval(Duration.ofSeconds(10))
.build())
.build())
.name("file-sink");
} else {
counts.print().name("print-sink");
}
// Apache Flink applications are composed lazily. Calling execute
// submits the Job and begins processing.
env.execute("WindowWordCount");
}
}