layout: page title: “Flink Interpreter for Apache Zeppelin” description: “Apache Flink is an open source platform for distributed stream and batch data processing.” group: interpreter

{% include JB/setup %}

Flink interpreter for Apache Zeppelin

Overview

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. Only Flink 1.10+ is supported, old versions of flink won't work. Apache Flink is supported in Zeppelin with the Flink interpreter group which consists of the five interpreters listed below.

Main Features

Play Flink in Zeppelin docker

For beginner, we would suggest you to play Flink in Zeppelin docker. First you need to download Flink, because there's no Flink binary distribution shipped with Zeppelin. e.g. Here we download Flink 1.12.2 to/mnt/disk1/flink-1.12.2, and we mount it to Zeppelin docker container and run the following command to start Zeppelin docker.

docker run -u $(id -u) -p 8080:8080 -p 8081:8081 --rm -v /mnt/disk1/flink-1.12.2:/opt/flink -e FLINK_HOME=/opt/flink  --name zeppelin apache/zeppelin:0.10.0

After running the above command, you can open http://localhost:8080 to play Flink in Zeppelin. We only verify the flink local mode in Zeppelin docker, other modes may not due to network issues. -p 8081:8081 is to expose Flink web ui, so that you can access Flink web ui via http://localhost:8081.

Here's screenshot of running note Flink Tutorial/5. Streaming Data Analytics

You can also mount notebook folder to replace the built-in zeppelin tutorial notebook. e.g. Here's a repo of Flink sql cookbook on Zeppelin: https://github.com/zjffdu/flink-sql-cookbook-on-zeppelin/

You can clone this repo and mount it to docker,

docker run -u $(id -u) -p 8080:8080 --rm -v /mnt/disk1/flink-sql-cookbook-on-zeppelin:/notebook -v /mnt/disk1/flink-1.12.2:/opt/flink -e FLINK_HOME=/opt/flink  -e ZEPPELIN_NOTEBOOK_DIR='/notebook' --name zeppelin apache/zeppelin:0.10.0

Prerequisites

Download Flink 1.10 or afterwards (Scala 2.11 & 2.12 are both supported)

Flink on Zeppelin Architecture

The above diagram is the architecture of Flink on Zeppelin. Flink interpreter on the left side is actually a Flink client which is responsible for compiling and managing Flink job lifecycle, such as submit, cancel job, monitoring job progress and so on. The Flink cluster on the right side is the place where executing Flink job. It could be a MiniCluster (local mode), Standalone cluster (remote mode), Yarn session cluster (yarn mode) or Yarn application session cluster (yarn-application mode)

There are 2 important components in Flink interpreter: Scala shell & Python shell

  • Scala shell is the entry point of Flink interpreter, it would create all the entry points of Flink program, such as ExecutionEnvironment,StreamExecutionEnvironment and TableEnvironment. Scala shell is responsible for compiling and running Scala code and sql.
  • Python shell is the entry point of PyFlink, it is responsible for compiling and running Python code.

Configuration

The Flink interpreter can be configured with properties provided by Zeppelin (as following table). You can also add and set other Flink properties which are not listed in the table. For a list of additional properties, refer to Flink Available Properties.

Interpreter Binding Mode

The default interpreter binding mode is globally shared. That means all notes share the same Flink interpreter which means they share the same Flink cluster. In practice, we would recommend you to use isolated per note which means each note has own Flink interpreter without affecting each other (Each one has his own Flink cluster).

Execution Mode

Flink in Zeppelin supports 4 execution modes (flink.execution.mode):

  • Local
  • Remote
  • Yarn
  • Yarn Application

Local Mode

Running Flink in local mode will start a MiniCluster in local JVM. By default, the local MiniCluster use port 8081, so make sure this port is available in your machine, otherwise you can configure rest.port to specify another port. You can also specify local.number-taskmanager and flink.tm.slot to customize the number of TM and number of slots per TM. Because by default it is only 4 TM with 1 slot in this MiniCluster which may not be enough for some cases.

Remote Mode

Running Flink in remote mode will connect to an existing Flink cluster which could be standalone cluster or yarn session cluster. Besides specifying flink.execution.mode to be remote, you also need to specify flink.execution.remote.host and flink.execution.remote.port to point to Flink job manager's rest api address.

Yarn Mode

In order to run Flink in Yarn mode, you need to make the following settings:

  • Set flink.execution.mode to be yarn
  • Set HADOOP_CONF_DIR in Flink's interpreter setting or zeppelin-env.sh.
  • Make sure hadoop command is on your PATH. Because internally Flink will call command hadoop classpath and load all the hadoop related jars in the Flink interpreter process

In this mode, Zeppelin would launch a Flink yarn session cluster for you and destroy it when you shutdown your Flink interpreter.

Yarn Application Mode

In the above yarn mode, there will be a separated Flink interpreter process on the Zeppelin server host. However, this may run out of resources when there are too many interpreter processes. So in practise, we would recommend you to use yarn application mode if you are using Flink 1.11 or afterwards (yarn application mode is only supported after Flink 1.11). In this mode Flink interpreter runs in the JobManager which is in yarn container. In order to run Flink in yarn application mode, you need to make the following settings:

  • Set flink.execution.mode to be yarn-application
  • Set HADOOP_CONF_DIR in Flink's interpreter setting or zeppelin-env.sh.
  • Make sure hadoop command is on your PATH. Because internally flink will call command hadoop classpath and load all the hadoop related jars in Flink interpreter process

Flink Scala

Scala is the default language of Flink on Zeppelin(%flink), and it is also the entry point of Flink interpreter. Underneath Flink interpreter will create Scala shell which would create several built-in variables, including ExecutionEnvironment,StreamExecutionEnvironment and so on. So don't create these Flink environment variables again, otherwise you might hit weird issues. The Scala code you write in Zeppelin will be submitted to this Scala shell.
Here are the builtin variables created in Flink Scala shell.

  • senv (StreamExecutionEnvironment),
  • benv (ExecutionEnvironment)
  • stenv (StreamTableEnvironment for blink planner)
  • btenv (BatchTableEnvironment for blink planner)
  • stenv_2 (StreamTableEnvironment for flink planner)
  • btenv_2 (BatchTableEnvironment for flink planner)
  • z (ZeppelinContext)

Blink/Flink Planner

There are 2 planners supported by Flink SQL: flink & blink.

  • If you want to use DataSet api, and convert it to Flink table then please use flink planner (btenv_2 and stenv_2).
  • In other cases, we would always recommend you to use blink planner. This is also what Flink batch/streaming sql interpreter use (%flink.bsql & %flink.ssql)

Check this page for the difference between flink planner and blink planner.

Stream WordCount Example

You can write whatever Scala code in Zeppelin.

e.g. in the following example, we write a classical streaming wordcount example.

Code Completion

You can type tab for code completion.

ZeppelinContext

ZeppelinContext provides some additional functions and utilities. See Zeppelin-Context for more details. For Flink interpreter, you can use z to display Flink Dataset/Table.

e.g. you can use z.show to display DataSet, Batch Table, Stream Table.

  • z.show(DataSet)
  • z.show(Batch Table)
  • z.show(Stream Table)

Flink SQL

In Zeppelin, there are 2 kinds of Flink sql interpreter you can use

  • %flink.ssql Streaming Sql interpreter which launch Flink streaming job via StreamTableEnvironment
  • %flink.bsql Batch Sql interpreter which launch Flink batch job via BatchTableEnvironment

Flink Sql interpreter in Zeppelin is equal to Flink Sql-client + many other enhancement features.

Enhancement SQL Features

Support batch SQL and streaming sql together.

In Flink Sql-client, either you run streaming sql or run batch sql in one session. You can not run them together. But in Zeppelin, you can do that. %flink.ssql is used for running streaming sql, while %flink.bsql is used for running batch sql. Batch/Streaming Flink jobs run in the same Flink session cluster.

Support multiple statements

You can write multiple sql statements in one paragraph, each sql statement is separated by semicolon.

Comment support

2 kinds of sql comments are supported in Zeppelin:

  • Single line comment start with --
  • Multiple line comment around with /* */

Job parallelism setting

You can set the sql parallelism via paragraph local property: parallelism

Support multiple insert

Sometimes you have multiple insert statements which read the same source, but write to different sinks. By default, each insert statement would launch a separated Flink job, but you can set paragraph local property: runAsOne to be true to run them in one single Flink job.

Set job name

You can set Flink job name for insert statement via setting paragraph local property: jobName. To be noticed, you can only set job name for insert statement. Select statement is not supported yet. And this kind of setting only works for single insert statement. It doesn't work for multiple insert we mentioned above.

Streaming Data Visualization

Zeppelin can visualize the select sql result of Flink streaming job. Overall it supports 3 modes:

  • Single
  • Update
  • Append

Single Mode

Single mode is for the case when the result of sql statement is always one row, such as the following example. The output format is HTML, and you can specify paragraph local property template for the final output content template. You can use {i} as placeholder for the ith column of result.

Update Mode

Update mode is suitable for the case when the output is more than one rows, and will always be updated continuously. Here’s one example where we use group by.

Append Mode

Append mode is suitable for the scenario where output data is always appended. E.g. the following example which use tumble window.

PyFlink

PyFlink is Python entry point of Flink on Zeppelin, internally Flink interpreter will create Python shell which would create Flink's environment variables (including ExecutionEnvironment, StreamExecutionEnvironment and so on). To be noticed, the java environment behind Pyflink is created in Scala shell. That means underneath Scala shell and Python shell share the same environment. These are variables created in Python shell.

  • s_env (StreamExecutionEnvironment),
  • b_env (ExecutionEnvironment)
  • st_env (StreamTableEnvironment for blink planner)
  • bt_env (BatchTableEnvironment for blink planner)
  • st_env_2 (StreamTableEnvironment for flink planner)
  • bt_env_2 (BatchTableEnvironment for flink planner)

Configure PyFlink

There are 3 things you need to configure to make Pyflink work in Zeppelin.

  • Install pyflink e.g. ( pip install apache-flink==1.11.1 ). If you need to use Pyflink udf, then you to install pyflink on all the task manager nodes. That means if you are using yarn, then all the yarn nodes need to install pyflink.
  • Copy python folder under ${FLINK_HOME}/opt to ${FLINK_HOME/lib.
  • Set zeppelin.pyflink.python as the python executable path. By default, it is the python in PATH. In case you have multiple versions of python installed, you need to configure zeppelin.pyflink.python as the python version you want to use.

How to use PyFlink

There are 2 ways to use PyFlink in Zeppelin

  • %flink.pyflink
  • %flink.ipyflink

%flink.pyflink is much simple and easy, you don't need to do anything except the above setting, but its function is also limited. We suggest you to use %flink.ipyflink which provides almost the same user experience like jupyter.

Configure IPyFlink

If you don't have anaconda installed, then you need to install the following 3 libraries.

pip install jupyter
pip install grpcio
pip install protobuf

If you have anaconda installed, then you only need to install following 2 libraries.

pip install grpcio
pip install protobuf

ZeppelinContext is also available in PyFlink, you can use it almost the same as in Flink Scala.

Check the Python doc for more features of IPython.

Third party dependencies

It is very common to have third party dependencies when you write Flink job in whatever languages (Scala, Python, Sql). It is very easy to add dependencies in IDE (e.g. add dependency in pom.xml), but how can you do that in Zeppelin ? Mainly there are 2 settings you can use to add third party dependencies

  • flink.execution.packages
  • flink.execution.jars

flink.execution.packages

This is the recommended way of adding dependencies. Its implementation is the same as adding dependencies in pom.xml. Underneath it would download all the packages and its transitive dependencies from maven repository, then put them on the classpath. Here's one example of how to add kafka connector of Flink 1.10 via inline configuration.

%flink.conf

flink.execution.packages  org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0

The format is artifactGroup:artifactId:version, if you have multiple packages, then separate them with comma. flink.execution.packages requires internet accessible. So if you can not access internet, you need to use flink.execution.jars instead.

flink.execution.jars

If your Zeppelin machine can not access internet or your dependencies are not deployed to maven repository, then you can use flink.execution.jars to specify the jar files you depend on (each jar file is separated with comma)

Here's one example of how to add kafka dependencies(including kafka connector and its transitive dependencies) via flink.execution.jars

%flink.conf

flink.execution.jars /usr/lib/flink-kafka/target/flink-kafka-1.0-SNAPSHOT.jar

Flink UDF

There are 4 ways you can define UDF in Zeppelin.

  • Write Scala UDF
  • Write PyFlink UDF
  • Create UDF via SQL
  • Configure udf jar via flink.udf.jars

Scala UDF

%flink

class ScalaUpper extends ScalarFunction {
  def eval(str: String) = str.toUpperCase
}

btenv.registerFunction("scala_upper", new ScalaUpper())

It is very straightforward to define scala udf almost the same as what you do in IDE. After creating udf class, you need to register it via btenv. You can also register it via stenv which share the same Catalog with btenv.

Python UDF


%flink.pyflink class PythonUpper(ScalarFunction): def eval(self, s): return s.upper() bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))

It is also very straightforward to define Python udf almost the same as what you do in IDE. After creating udf class, you need to register it via bt_env. You can also register it via st_env which share the same Catalog with bt_env.

UDF via SQL

Some simple udf can be written in Zeppelin. But if the udf logic is very complicated, then it is better to write it in IDE, then register it in Zeppelin as following

%flink.ssql

CREATE FUNCTION myupper AS 'org.apache.zeppelin.flink.udf.JavaUpper';

But this kind of approach requires the udf jar must be on CLASSPATH, so you need to configure flink.execution.jars to include this udf jar on CLASSPATH, such as following:

%flink.conf

flink.execution.jars /usr/lib/flink-udf-1.0-SNAPSHOT.jar

flink.udf.jars

The above 3 approaches all have some limitations:

  • It is suitable to write simple Scala udf or Python udf in Zeppelin, but not suitable to write very complicated udf in Zeppelin. Because notebook doesn't provide advanced features compared to IDE, such as package management, code navigation and etc.
  • It is not easy to share the udf between notes or users, you have to run the paragraph of defining udf in each flink interpreter.

So when you have many udfs or udf logic is very complicated and you don't want to register them by yourself every time, then you can use flink.udf.jars

  • Step 1. Create a udf project in your IDE, write your udf there.
  • Step 2. Set flink.udf.jars to point to the udf jar you build from your udf project

For example,

%flink.conf

flink.execution.jars /usr/lib/flink-udf-1.0-SNAPSHOT.jar

Zeppelin would scan this jar, find out all the udf classes and then register them automatically for you. The udf name is the class name. For example, here's the output of show functions after specifing the above udf jars in flink.udf.jars

By default, Zeppelin would scan all the classes in this jar, so it would be pretty slow if your jar is very big specially when your udf jar has other dependencies. So in this case we would recommend you to specify flink.udf.jars.packages to specify the package to scan, this can reduce the number of classes to scan and make the udf detection much faster.

How to use Hive

In order to use Hive in Flink, you have to make the following settings.

  • Set zeppelin.flink.enableHive to be true
  • Set zeppelin.flink.hive.version to be the hive version you are using.
  • Set HIVE_CONF_DIR to be the location where hive-site.xml is located. Make sure hive metastore is started and you have configured hive.metastore.uris in hive-site.xml
  • Copy the following dependencies to the lib folder of flink installation.
    • flink-connector-hive_2.11–*.jar
    • flink-hadoop-compatibility_2.11–*.jar
    • hive-exec-2.x.jar (for hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303–0.9.2.jar and libthrift-0.9.2.jar)

Paragraph local properties

In the section of Streaming Data Visualization, we demonstrate the different visualization type via paragraph local properties: type. In this section, we will list and explain all the supported local properties in Flink interpreter.

Tutorial Notes

Zeppelin is shipped with several Flink tutorial notes which may be helpful for you. You can check for more features in the tutorial notes.

Community

Join our community to discuss with others.