blob: 6e96f71215d0ccb45ddd01180906aedddd890e34 [file] [log] [blame] [view]
---
title: Intro to the DataStream API
nav-id: datastream-api
nav-pos: 2
nav-title: Intro to the DataStream API
nav-parent_id: learn-flink
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
The focus of this training is to broadly cover the DataStream API well enough that you will be able
to get started writing streaming applications.
* This will be replaced by the TOC
{:toc}
## What can be Streamed?
Flink's DataStream APIs for Java and Scala will let you stream anything they can serialize. Flink's
own serializer is used for
- basic types, i.e., String, Long, Integer, Boolean, Array
- composite types: Tuples, POJOs, and Scala case classes
and Flink falls back to Kryo for other types. It is also possible to use other serializers with
Flink. Avro, in particular, is well supported.
### Java tuples and POJOs
Flink's native serializer can operate efficiently on tuples and POJOs.
#### Tuples
For Java, Flink defines its own `Tuple0` thru `Tuple25` types.
{% highlight java %}
Tuple2<String, Integer> person = Tuple2.of("Fred", 35);
// zero based index!
String name = person.f0;
Integer age = person.f1;
{% endhighlight %}
#### POJOs
Flink recognizes a data type as a POJO type (and allows “by-name” field referencing) if the following conditions are fulfilled:
- The class is public and standalone (no non-static inner class)
- The class has a public no-argument constructor
- All non-static, non-transient fields in the class (and all superclasses) are either public (and
non-final) or have public getter- and setter- methods that follow the Java beans naming
conventions for getters and setters.
Example:
{% highlight java %}
public class Person {
public String name;
public Integer age;
public Person() {};
public Person(String name, Integer age) {
. . .
};
}
Person person = new Person("Fred Flintstone", 35);
{% endhighlight %}
Flink's serializer [supports schema evolution for POJO types]({% link dev/stream/state/schema_evolution.md %}#pojo-types).
### Scala tuples and case classes
These work just as you'd expect.
{% top %}
## A Complete Example
This example takes a stream of records about people as input, and filters it to only include the adults.
{% highlight java %}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;
public class Example {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2));
DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});
adults.print();
env.execute();
}
public static class Person {
public String name;
public Integer age;
public Person() {};
public Person(String name, Integer age) {
this.name = name;
this.age = age;
};
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
};
}
}
{% endhighlight %}
### Stream execution environment
Every Flink application needs an execution environment, `env` in this example. Streaming
applications need to use a `StreamExecutionEnvironment`.
The DataStream API calls made in your application build a job graph that is attached to the
`StreamExecutionEnvironment`. When `env.execute()` is called this graph is packaged up and sent to
the JobManager, which parallelizes the job and distributes slices of it to the Task Managers for
execution. Each parallel slice of your job will be executed in a *task slot*.
Note that if you don't call execute(), your application won't be run.
<img src="{% link /fig/distributed-runtime.svg %}" alt="Flink runtime: client, job manager, task managers" class="offset" width="80%" />
This distributed runtime depends on your application being serializable. It also requires that all
dependencies are available to each node in the cluster.
### Basic stream sources
The example above constructs a `DataStream<Person>` using `env.fromElements(...)`. This is a
convenient way to throw together a simple stream for use in a prototype or test. There is also a
`fromCollection(Collection)` method on `StreamExecutionEnvironment`. So instead, you could do this:
{% highlight java %}
List<Person> people = new ArrayList<Person>();
people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));
DataStream<Person> flintstones = env.fromCollection(people);
{% endhighlight %}
Another convenient way to get some data into a stream while prototyping is to use a socket
{% highlight java %}
DataStream<String> lines = env.socketTextStream("localhost", 9999)
{% endhighlight %}
or a file
{% highlight java %}
DataStream<String> lines = env.readTextFile("file:///path");
{% endhighlight %}
In real applications the most commonly used data sources are those that support low-latency, high
throughput parallel reads in combination with rewind and replay -- the prerequisites for high
performance and fault tolerance -- such as Apache Kafka, Kinesis, and various filesystems. REST APIs
and databases are also frequently used for stream enrichment.
### Basic stream sinks
The example above uses `adults.print()` to print its results to the task manager logs (which will
appear in your IDE's console, when running in an IDE). This will call `toString()` on each element
of the stream.
The output looks something like this
1> Fred: age 35
2> Wilma: age 35
where 1> and 2> indicate which sub-task (i.e., thread) produced the output.
In production, commonly used sinks include the StreamingFileSink, various databases,
and several pub-sub systems.
### Debugging
In production, your application will run in a remote cluster or set of containers. And if it fails,
it will fail remotely. The JobManager and TaskManager logs can be very helpful in debugging such
failures, but it is much easier to do local debugging inside an IDE, which is something that Flink
supports. You can set breakpoints, examine local variables, and step through your code. You can also
step into Flink's code, which can be a great way to learn more about its internals if you are
curious to see how Flink works.
{% top %}
## Hands-on
At this point you know enough to get started coding and running a simple DataStream application.
Clone the [flink-training repo](https://github.com/apache/flink-training/tree/{% if site.is_stable %}release-{{ site.version_title }}{% else %}master{% endif %}), and after following the
instructions in the README, do the first exercise:
[Filtering a Stream (Ride Cleansing)](https://github.com/apache/flink-training/tree/{% if site.is_stable %}release-{{ site.version_title }}{% else %}master{% endif %}/ride-cleansing).
{% top %}
## Further Reading
- [Flink Serialization Tuning Vol. 1: Choosing your Serializer — if you can](https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html)
- [Anatomy of a Flink Program]({% link dev/datastream_api.md %}#anatomy-of-a-flink-program)
- [Data Sources]({% link dev/datastream_api.md %}#data-sources)
- [Data Sinks]({% link dev/datastream_api.md %}#data-sinks)
- [DataStream Connectors]({% link dev/connectors/index.md %})
{% top %}