layout: page title: “Flink Interpreter for Apache Zeppelin” description: “Apache Flink is an open source platform for distributed stream and batch data processing.” group: interpreter

{% include JB/setup %}

Flink interpreter for Apache Zeppelin

Overview

Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink also builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.

In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. Only Flink 1.10+ is supported, old version of flink may not work. Apache Flink is supported in Zeppelin with Flink interpreter group which consists of below five interpreters.

Prerequisites

  • Download Flink 1.10 for scala 2.11 (Only scala-2.11 is supported, scala-2.12 is not supported yet in Zeppelin)
  • Download flink-hadoop-shaded and put it under lib folder of flink (flink interpreter need that to support yarn mode)

Configuration

The Flink interpreter can be configured with properties provided by Zeppelin (as following table). You can also set other flink properties which are not listed in the table. For a list of additional properties, refer to Flink Available Properties.

StreamExecutionEnvironment, ExecutionEnvironment, StreamTableEnvironment, BatchTableEnvironment

Zeppelin will create 6 variables as flink scala (%flink) entry point:

  • senv (StreamExecutionEnvironment),
  • benv (ExecutionEnvironment)
  • stenv (StreamTableEnvironment for blink planner)
  • btenv (BatchTableEnvironment for blink planner)
  • stenv_2 (StreamTableEnvironment for flink planner)
  • btenv_2 (BatchTableEnvironment for flink planner)

And will create 6 variables as pyflink (%flink.pyflink or %flink.ipyflink) entry point:

  • s_env (StreamExecutionEnvironment),
  • b_env (ExecutionEnvironment)
  • st_env (StreamTableEnvironment for blink planner)
  • bt_env (BatchTableEnvironment for blink planner)
  • st_env_2 (StreamTableEnvironment for flink planner)
  • bt_env_2 (BatchTableEnvironment for flink planner)

Execution mode (Local/Remote/Yarn)

Flink in Zeppelin supports 3 execution modes (flink.execution.mode):

  • Local
  • Remote
  • Yarn

Run Flink in Local Mode

Running Flink in Local mode will start a MiniCluster in local JVM. By default, the local MiniCluster will use port 8081, so make sure this port is available in your machine, otherwise you can configure rest.port to specify another port. You can also specify local.number-taskmanager and flink.tm.slot to customize the number of TM and number of slots per TM, because by default it is only 4 TM with 1 Slots which may not be enough for some cases.

Run Flink in Remote Mode

Running Flink in remote mode will connect to a existing flink cluster which could be standalone cluster or yarn session cluster. Besides specifying flink.execution.mode to be remote. You also need to specify flink.execution.remote.host and flink.execution.remote.port to point to flink job manager.

Run Flink in Yarn Mode

In order to run flink in Yarn mode, you need to make the following settings:

  • Set flink.execution.mode to yarn
  • Set HADOOP_CONF_DIR in flink's interpreter setting.
  • Make sure hadoop command is your PATH. Because internally flink will call command hadoop classpath and load all the hadoop related jars in the flink interpreter process

Blink/Flink Planner

There‘re 2 planners supported by Flink’s table api: flink & blink.

  • If you want to use DataSet api, and convert it to flink table then please use flink planner (btenv_2 and stenv_2).
  • In other cases, we would always recommend you to use blink planner. This is also what flink batch/streaming sql interpreter use (%flink.bsql & %flink.ssql)

How to use Hive

In order to use Hive in Flink, you have to make the following setting.

  • Set zeppelin.flink.enableHive to be true
  • Set zeppelin.flink.hive.version to be the hive version you are using.
  • Set HIVE_CONF_DIR to be the location where hive-site.xml is located. Make sure hive metastore is started and you have configure hive.metastore.uris in hive-site.xml
  • Copy the following dependencies to the lib folder of flink installation. 
    • flink-connector-hive_2.11–1.10.0.jar
    • flink-hadoop-compatibility_2.11–1.10.0.jar
    • hive-exec-2.x.jar (for hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303–0.9.2.jar and libthrift-0.9.2.jar)

After these settings, you will be able to query hive table via either table api %flink or batch sql %flink.bsql

Flink Batch SQL

%flink.bsql is used for flink's batch sql. You just type help to get all the available commands.

  • Use insert into statement for batch ETL
  • Use select statement for exploratory data analytics

Flink Streaming SQL

%flink.ssql is used for flink‘s streaming sql. You just type help to get all the available commands. Mainlly there’re 2 cases:

  • Use insert into statement for streaming processing
  • Use select statement for streaming data analytics

Flink UDF

You can use Flink scala UDF or Python UDF in sql. UDF for batch and streaming sql is the same. Here's 2 examples.

  • Scala UDF
%flink

class ScalaUpper extends ScalarFunction {
  def eval(str: String) = str.toUpperCase
}
btenv.registerFunction("scala_upper", new ScalaUpper())

  • Python UDF

%flink.pyflink class PythonUpper(ScalarFunction): def eval(self, s): return s.upper() bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))

Besides defining udf in Zeppelin, you can also load udfs in jars via flink.udf.jars. For example, you can create udfs in intellij and then build these udfs in one jar. After that you can specify flink.udf.jars to this jar, and flink interpreter will detect all the udfs in this jar and register all the udfs to TableEnvironment, the udf name is the class name.

ZeppelinContext

Zeppelin automatically injects ZeppelinContext as variable z in your Scala/Python environment. ZeppelinContext provides some additional functions and utilities. See Zeppelin-Context for more details.

IPython Support

By default, zeppelin would use IPython in %flink.pyflink when IPython is available, Otherwise it would fall back to the original python implementation. For the IPython features, you can refer docPython Interpreter

Tutorial Notes

Zeppelin is shipped with several Flink tutorial notes which may be helpful for you. Except the first one, the below 4 notes cover the 4 main scenarios of flink.

  • Flink Basic
  • Batch ETL
  • Exploratory Data Analytics
  • Streaming ETL
  • Streaming Data Analytics