{% include JB/setup %}
Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink also builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.
In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. Only Flink 1.10+ is supported, old version of flink may not work. Apache Flink is supported in Zeppelin with Flink interpreter group which consists of below five interpreters.
The Flink interpreter can be configured with properties provided by Zeppelin (as following table). You can also set other flink properties which are not listed in the table. For a list of additional properties, refer to Flink Available Properties.
Zeppelin will create 6 variables as flink scala (%flink
) entry point:
senv
(StreamExecutionEnvironment),benv
(ExecutionEnvironment)stenv
(StreamTableEnvironment for blink planner)btenv
(BatchTableEnvironment for blink planner)stenv_2
(StreamTableEnvironment for flink planner)btenv_2
(BatchTableEnvironment for flink planner)And will create 6 variables as pyflink (%flink.pyflink
or %flink.ipyflink
) entry point:
s_env
(StreamExecutionEnvironment),b_env
(ExecutionEnvironment)st_env
(StreamTableEnvironment for blink planner)bt_env
(BatchTableEnvironment for blink planner)st_env_2
(StreamTableEnvironment for flink planner)bt_env_2
(BatchTableEnvironment for flink planner)Flink in Zeppelin supports 3 execution modes (flink.execution.mode
):
Running Flink in Local mode will start a MiniCluster in local JVM. By default, the local MiniCluster will use port 8081, so make sure this port is available in your machine, otherwise you can configure rest.port
to specify another port. You can also specify local.number-taskmanager
and flink.tm.slot
to customize the number of TM and number of slots per TM, because by default it is only 4 TM with 1 Slots which may not be enough for some cases.
Running Flink in remote mode will connect to a existing flink cluster which could be standalone cluster or yarn session cluster. Besides specifying flink.execution.mode
to be remote
. You also need to specify flink.execution.remote.host
and flink.execution.remote.port
to point to flink job manager.
In order to run flink in Yarn mode, you need to make the following settings:
flink.execution.mode
to yarn
HADOOP_CONF_DIR
in flink's interpreter setting.hadoop
command is your PATH. Because internally flink will call command hadoop classpath
and load all the hadoop related jars in the flink interpreter processThere‘re 2 planners supported by Flink’s table api: flink
& blink
.
btenv_2
and stenv_2
).blink
planner. This is also what flink batch/streaming sql interpreter use (%flink.bsql
& %flink.ssql
)In order to use Hive in Flink, you have to make the following setting.
zeppelin.flink.enableHive
to be truezeppelin.flink.hive.version
to be the hive version you are using.HIVE_CONF_DIR
to be the location where hive-site.xml
is located. Make sure hive metastore is started and you have configure hive.metastore.uris
in hive-site.xml
After these settings, you will be able to query hive table via either table api %flink
or batch sql %flink.bsql
%flink.bsql
is used for flink's batch sql. You just type help
to get all the available commands.
insert into
statement for batch ETLselect
statement for exploratory data analytics%flink.ssql
is used for flink‘s streaming sql. You just type help
to get all the available commands. Mainlly there’re 2 cases:
insert into
statement for streaming processingselect
statement for streaming data analyticsYou can use Flink scala UDF or Python UDF in sql. UDF for batch and streaming sql is the same. Here's 2 examples.
%flink class ScalaUpper extends ScalarFunction { def eval(str: String) = str.toUpperCase } btenv.registerFunction("scala_upper", new ScalaUpper())
%flink.pyflink class PythonUpper(ScalarFunction): def eval(self, s): return s.upper() bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))
Besides defining udf in Zeppelin, you can also load udfs in jars via flink.udf.jars
. For example, you can create udfs in intellij and then build these udfs in one jar. After that you can specify flink.udf.jars
to this jar, and flink interpreter will detect all the udfs in this jar and register all the udfs to TableEnvironment, the udf name is the class name.
Zeppelin automatically injects ZeppelinContext
as variable z
in your Scala/Python environment. ZeppelinContext
provides some additional functions and utilities. See Zeppelin-Context for more details.
By default, zeppelin would use IPython in %flink.pyflink
when IPython is available, Otherwise it would fall back to the original python implementation. For the IPython features, you can refer docPython Interpreter
Zeppelin is shipped with several Flink tutorial notes which may be helpful for you. Except the first one, the below 4 notes cover the 4 main scenarios of flink.