layout: page title: “Flink Interpreter for Apache Zeppelin” description: “Apache Flink is an open source platform for distributed stream and batch data processing.” group: interpreter

{% include JB/setup %}

Flink interpreter for Apache Zeppelin

Overview

Apache Flink is an open source platform for distributed stream and batch data processing. Flinkā€™s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink also builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.

Apache Flink is supported in Zeppelin with Flink interpreter group which consists of below five interpreters.

Configuration

The Flink interpreter can be configured with properties provided by Zeppelin. You can also set other flink properties which are not listed in the table. For a list of additional properties, refer to Flink Available Properties.

StreamExecutionEnvironment, ExecutionEnvironment, StreamTableEnvironment, BatchTableEnvironment

Zeppelin will create 4 variables to represent flink's entrypoint:

  • senv (StreamExecutionEnvironment),
  • env (ExecutionEnvironment)
  • stenv (StreamTableEnvironment)
  • btenv (BatchTableEnvironment)

Flink Planner

Starting from Flink 1.9, there‘re 2 planners supported by Flink’s table api: flink & blink.

  • If you want to use DataSet api, then please use flink planner (specify zeppelin.flink.planner to flink).
  • In other cases, we would always recommend you to use blink planner which is also the default value of zeppelin.flink.planner.

How to use Hive

In order to use Hive in Flink, you have to do several setting.

  • Set zeppelin.flink.enableHive to true
  • Copy necessary dependencies to flink's lib folder, check this link for more details
    • flink-connector-hive_{scala_version}-{flink.version}.jar
    • flink-hadoop-compatibility_{scala_version}-{flink.version}.jar
    • flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar
    • hive-exec-2.x.jar (for Hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar)
  • Specify HIVE_CONF_DIR either in flink interpreter setting or zeppelin-env.sh
  • Specify zeppelin.flink.hive.version, by default it is 2.3.4. If you are using Hive 1.2.x, then you need to set it as 1.2.2

After these settings, you will be able to query hive table via either table api %flink or batch sql %flink.bsql

ZeppelinContext

Zeppelin automatically injects ZeppelinContext as variable z in your Scala/Python environment. ZeppelinContext provides some additional functions and utilities. See Zeppelin-Context for more details.

IPython support

By default, zeppelin would use IPython in %flink.pyflink when IPython is available, Otherwise it would fall back to the original python implementation. If you don't want to use IPython, then you can set zeppelin.pyflink.useIPython as false in interpreter setting. For the IPython features, you can refer doc Python Interpreter

Tutorial Notes

Zeppelin is shipped with several Flink tutorial notes which may be helpful for you.