{% include JB/setup %}
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. Only Flink 1.10+ is supported, old versions of flink won't work. Apache Flink is supported in Zeppelin with the Flink interpreter group which consists of the five interpreters listed below.
For beginner, we would suggest you to play Flink in Zeppelin docker. First you need to download Flink, because there's no Flink binary distribution shipped with Zeppelin. e.g. Here we download Flink 1.12.2 to/mnt/disk1/flink-1.12.2
, and we mount it to Zeppelin docker container and run the following command to start Zeppelin docker.
docker run -u $(id -u) -p 8080:8080 -p 8081:8081 --rm -v /mnt/disk1/flink-1.12.2:/opt/flink -e FLINK_HOME=/opt/flink --name zeppelin apache/zeppelin:0.10.0
After running the above command, you can open http://localhost:8080
to play Flink in Zeppelin. We only verify the flink local mode in Zeppelin docker, other modes may not due to network issues. -p 8081:8081
is to expose Flink web ui, so that you can access Flink web ui via http://localhost:8081
.
Here's screenshot of running note Flink Tutorial/5. Streaming Data Analytics
You can also mount notebook folder to replace the built-in zeppelin tutorial notebook. e.g. Here's a repo of Flink sql cookbook on Zeppelin: https://github.com/zjffdu/flink-sql-cookbook-on-zeppelin/
You can clone this repo and mount it to docker,
docker run -u $(id -u) -p 8080:8080 --rm -v /mnt/disk1/flink-sql-cookbook-on-zeppelin:/notebook -v /mnt/disk1/flink-1.12.2:/opt/flink -e FLINK_HOME=/opt/flink -e ZEPPELIN_NOTEBOOK_DIR='/notebook' --name zeppelin apache/zeppelin:0.10.0
Download Flink 1.10 or afterwards (Scala 2.11 & 2.12 are both supported)
The above diagram is the architecture of Flink on Zeppelin. Flink interpreter on the left side is actually a Flink client which is responsible for compiling and managing Flink job lifecycle, such as submit, cancel job, monitoring job progress and so on. The Flink cluster on the right side is the place where executing Flink job. It could be a MiniCluster (local mode), Standalone cluster (remote mode), Yarn session cluster (yarn mode) or Yarn application session cluster (yarn-application mode)
There are 2 important components in Flink interpreter: Scala shell & Python shell
The Flink interpreter can be configured with properties provided by Zeppelin (as following table). You can also add and set other Flink properties which are not listed in the table. For a list of additional properties, refer to Flink Available Properties.
The default interpreter binding mode is globally shared
. That means all notes share the same Flink interpreter which means they share the same Flink cluster. In practice, we would recommend you to use isolated per note
which means each note has own Flink interpreter without affecting each other (Each one has his own Flink cluster).
Flink in Zeppelin supports 4 execution modes (flink.execution.mode
):
Running Flink in local mode will start a MiniCluster in local JVM. By default, the local MiniCluster use port 8081, so make sure this port is available in your machine, otherwise you can configure rest.port
to specify another port. You can also specify local.number-taskmanager
and flink.tm.slot
to customize the number of TM and number of slots per TM. Because by default it is only 4 TM with 1 slot in this MiniCluster which may not be enough for some cases.
Running Flink in remote mode will connect to an existing Flink cluster which could be standalone cluster or yarn session cluster. Besides specifying flink.execution.mode
to be remote
, you also need to specify flink.execution.remote.host
and flink.execution.remote.port
to point to Flink job manager's rest api address.
In order to run Flink in Yarn mode, you need to make the following settings:
flink.execution.mode
to be yarn
HADOOP_CONF_DIR
in Flink's interpreter setting or zeppelin-env.sh
.hadoop
command is on your PATH
. Because internally Flink will call command hadoop classpath
and load all the hadoop related jars in the Flink interpreter processIn this mode, Zeppelin would launch a Flink yarn session cluster for you and destroy it when you shutdown your Flink interpreter.
In the above yarn mode, there will be a separated Flink interpreter process on the Zeppelin server host. However, this may run out of resources when there are too many interpreter processes. So in practise, we would recommend you to use yarn application mode if you are using Flink 1.11 or afterwards (yarn application mode is only supported after Flink 1.11). In this mode Flink interpreter runs in the JobManager which is in yarn container. In order to run Flink in yarn application mode, you need to make the following settings:
flink.execution.mode
to be yarn-application
HADOOP_CONF_DIR
in Flink's interpreter setting or zeppelin-env.sh
.hadoop
command is on your PATH
. Because internally flink will call command hadoop classpath
and load all the hadoop related jars in Flink interpreter processScala is the default language of Flink on Zeppelin(%flink
), and it is also the entry point of Flink interpreter. Underneath Flink interpreter will create Scala shell which would create several built-in variables, including ExecutionEnvironment,StreamExecutionEnvironment and so on. So don't create these Flink environment variables again, otherwise you might hit weird issues. The Scala code you write in Zeppelin will be submitted to this Scala shell.
Here are the builtin variables created in Flink Scala shell.
There are 2 planners supported by Flink SQL: flink
& blink
.
flink
planner (btenv_2
and stenv_2
).blink
planner. This is also what Flink batch/streaming sql interpreter use (%flink.bsql
& %flink.ssql
)Check this page for the difference between flink planner and blink planner.
You can write whatever Scala code in Zeppelin.
e.g. in the following example, we write a classical streaming wordcount example.
You can type tab for code completion.
ZeppelinContext
provides some additional functions and utilities. See Zeppelin-Context for more details. For Flink interpreter, you can use z
to display Flink Dataset/Table
.
e.g. you can use z.show
to display DataSet, Batch Table, Stream Table.
In Zeppelin, there are 2 kinds of Flink sql interpreter you can use
%flink.ssql
Streaming Sql interpreter which launch Flink streaming job via StreamTableEnvironment
%flink.bsql
Batch Sql interpreter which launch Flink batch job via BatchTableEnvironment
Flink Sql interpreter in Zeppelin is equal to Flink Sql-client + many other enhancement features.
In Flink Sql-client, either you run streaming sql or run batch sql in one session. You can not run them together. But in Zeppelin, you can do that. %flink.ssql
is used for running streaming sql, while %flink.bsql
is used for running batch sql. Batch/Streaming Flink jobs run in the same Flink session cluster.
You can write multiple sql statements in one paragraph, each sql statement is separated by semicolon.
2 kinds of sql comments are supported in Zeppelin:
--
/* */
You can set the sql parallelism via paragraph local property: parallelism
Sometimes you have multiple insert statements which read the same source, but write to different sinks. By default, each insert statement would launch a separated Flink job, but you can set paragraph local property: runAsOne
to be true
to run them in one single Flink job.
You can set Flink job name for insert statement via setting paragraph local property: jobName
. To be noticed, you can only set job name for insert statement. Select statement is not supported yet. And this kind of setting only works for single insert statement. It doesn't work for multiple insert we mentioned above.
Zeppelin can visualize the select sql result of Flink streaming job. Overall it supports 3 modes:
Single mode is for the case when the result of sql statement is always one row, such as the following example. The output format is HTML, and you can specify paragraph local property template
for the final output content template. You can use {i}
as placeholder for the ith
column of result.
Update mode is suitable for the case when the output is more than one rows, and will always be updated continuously. Here’s one example where we use group by.
Append mode is suitable for the scenario where output data is always appended. E.g. the following example which use tumble window.
PyFlink is Python entry point of Flink on Zeppelin, internally Flink interpreter will create Python shell which would create Flink's environment variables (including ExecutionEnvironment, StreamExecutionEnvironment and so on). To be noticed, the java environment behind Pyflink is created in Scala shell. That means underneath Scala shell and Python shell share the same environment. These are variables created in Python shell.
s_env
(StreamExecutionEnvironment),b_env
(ExecutionEnvironment)st_env
(StreamTableEnvironment for blink planner)bt_env
(BatchTableEnvironment for blink planner)st_env_2
(StreamTableEnvironment for flink planner)bt_env_2
(BatchTableEnvironment for flink planner)There are 3 things you need to configure to make Pyflink work in Zeppelin.
python
folder under ${FLINK_HOME}/opt
to ${FLINK_HOME/lib
.zeppelin.pyflink.python
as the python executable path. By default, it is the python in PATH
. In case you have multiple versions of python installed, you need to configure zeppelin.pyflink.python
as the python version you want to use.There are 2 ways to use PyFlink in Zeppelin
%flink.pyflink
%flink.ipyflink
%flink.pyflink
is much simple and easy, you don't need to do anything except the above setting, but its function is also limited. We suggest you to use %flink.ipyflink
which provides almost the same user experience like jupyter.
If you don't have anaconda installed, then you need to install the following 3 libraries.
pip install jupyter pip install grpcio pip install protobuf
If you have anaconda installed, then you only need to install following 2 libraries.
pip install grpcio pip install protobuf
ZeppelinContext
is also available in PyFlink, you can use it almost the same as in Flink Scala.
Check the Python doc for more features of IPython.
It is very common to have third party dependencies when you write Flink job in whatever languages (Scala, Python, Sql). It is very easy to add dependencies in IDE (e.g. add dependency in pom.xml), but how can you do that in Zeppelin ? Mainly there are 2 settings you can use to add third party dependencies
This is the recommended way of adding dependencies. Its implementation is the same as adding dependencies in pom.xml
. Underneath it would download all the packages and its transitive dependencies from maven repository, then put them on the classpath. Here's one example of how to add kafka connector of Flink 1.10 via inline configuration.
%flink.conf flink.execution.packages org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0
The format is artifactGroup:artifactId:version
, if you have multiple packages, then separate them with comma. flink.execution.packages
requires internet accessible. So if you can not access internet, you need to use flink.execution.jars
instead.
If your Zeppelin machine can not access internet or your dependencies are not deployed to maven repository, then you can use flink.execution.jars
to specify the jar files you depend on (each jar file is separated with comma)
Here's one example of how to add kafka dependencies(including kafka connector and its transitive dependencies) via flink.execution.jars
%flink.conf flink.execution.jars /usr/lib/flink-kafka/target/flink-kafka-1.0-SNAPSHOT.jar
There are 4 ways you can define UDF in Zeppelin.
%flink class ScalaUpper extends ScalarFunction { def eval(str: String) = str.toUpperCase } btenv.registerFunction("scala_upper", new ScalaUpper())
It is very straightforward to define scala udf almost the same as what you do in IDE. After creating udf class, you need to register it via btenv
. You can also register it via stenv
which share the same Catalog with btenv
.
%flink.pyflink class PythonUpper(ScalarFunction): def eval(self, s): return s.upper() bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))
It is also very straightforward to define Python udf almost the same as what you do in IDE. After creating udf class, you need to register it via bt_env
. You can also register it via st_env
which share the same Catalog with bt_env
.
Some simple udf can be written in Zeppelin. But if the udf logic is very complicated, then it is better to write it in IDE, then register it in Zeppelin as following
%flink.ssql CREATE FUNCTION myupper AS 'org.apache.zeppelin.flink.udf.JavaUpper';
But this kind of approach requires the udf jar must be on CLASSPATH
, so you need to configure flink.execution.jars
to include this udf jar on CLASSPATH
, such as following:
%flink.conf flink.execution.jars /usr/lib/flink-udf-1.0-SNAPSHOT.jar
The above 3 approaches all have some limitations:
So when you have many udfs or udf logic is very complicated and you don't want to register them by yourself every time, then you can use flink.udf.jars
flink.udf.jars
to point to the udf jar you build from your udf projectFor example,
%flink.conf flink.execution.jars /usr/lib/flink-udf-1.0-SNAPSHOT.jar
Zeppelin would scan this jar, find out all the udf classes and then register them automatically for you. The udf name is the class name. For example, here's the output of show functions after specifing the above udf jars in flink.udf.jars
By default, Zeppelin would scan all the classes in this jar, so it would be pretty slow if your jar is very big specially when your udf jar has other dependencies. So in this case we would recommend you to specify flink.udf.jars.packages
to specify the package to scan, this can reduce the number of classes to scan and make the udf detection much faster.
In order to use Hive in Flink, you have to make the following settings.
zeppelin.flink.enableHive
to be truezeppelin.flink.hive.version
to be the hive version you are using.HIVE_CONF_DIR
to be the location where hive-site.xml
is located. Make sure hive metastore is started and you have configured hive.metastore.uris
in hive-site.xml
In the section of Streaming Data Visualization
, we demonstrate the different visualization type via paragraph local properties: type
. In this section, we will list and explain all the supported local properties in Flink interpreter.
Zeppelin is shipped with several Flink tutorial notes which may be helpful for you. You can check for more features in the tutorial notes.
Join our community to discuss with others.