{% include JB/setup %}
Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink also builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.
In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. Only Flink 1.10+ is supported, old version of flink won't work. Apache Flink is supported in Zeppelin with Flink interpreter group which consists of below five interpreters.
The Flink interpreter can be configured with properties provided by Zeppelin (as following table). You can also add and set other flink properties which are not listed in the table. For a list of additional properties, refer to Flink Available Properties.
Zeppelin will create 6 variables as flink scala (%flink
) entry point:
senv
(StreamExecutionEnvironment),benv
(ExecutionEnvironment)stenv
(StreamTableEnvironment for blink planner)btenv
(BatchTableEnvironment for blink planner)stenv_2
(StreamTableEnvironment for flink planner)btenv_2
(BatchTableEnvironment for flink planner)And will create 6 variables as pyflink (%flink.pyflink
or %flink.ipyflink
) entry point:
s_env
(StreamExecutionEnvironment),b_env
(ExecutionEnvironment)st_env
(StreamTableEnvironment for blink planner)bt_env
(BatchTableEnvironment for blink planner)st_env_2
(StreamTableEnvironment for flink planner)bt_env_2
(BatchTableEnvironment for flink planner)There‘re 2 planners supported by Flink’s table api: flink
& blink
.
btenv_2
and stenv_2
).blink
planner. This is also what flink batch/streaming sql interpreter use (%flink.bsql
& %flink.ssql
)Check this page for the difference between flink planner and blink planner.
Flink in Zeppelin supports 3 execution modes (flink.execution.mode
):
Running Flink in Local mode will start a MiniCluster in local JVM. By default, the local MiniCluster will use port 8081, so make sure this port is available in your machine, otherwise you can configure rest.port
to specify another port. You can also specify local.number-taskmanager
and flink.tm.slot
to customize the number of TM and number of slots per TM, because by default it is only 4 TM with 1 Slots which may not be enough for some cases.
Running Flink in remote mode will connect to a existing flink cluster which could be standalone cluster or yarn session cluster. Besides specifying flink.execution.mode
to be remote
. You also need to specify flink.execution.remote.host
and flink.execution.remote.port
to point to flink job manager.
In order to run flink in Yarn mode, you need to make the following settings:
flink.execution.mode
to yarn
HADOOP_CONF_DIR
in flink's interpreter setting.hadoop
command is on your PATH. Because internally flink will call command hadoop classpath
and load all the hadoop related jars in the flink interpreter processIn order to use Hive in Flink, you have to make the following setting.
zeppelin.flink.enableHive
to be truezeppelin.flink.hive.version
to be the hive version you are using.HIVE_CONF_DIR
to be the location where hive-site.xml
is located. Make sure hive metastore is started and you have configured hive.metastore.uris
in hive-site.xml
%flink.bsql
is used for flink's batch sql. You can type help
to get all the available commands. It supports all the flink sql, including DML/DDL/DQL.
insert into
statement for batch ETLselect
statement for batch data analytics%flink.ssql
is used for flink's streaming sql. You just type help
to get all the available commands. It supports all the flink sql, including DML/DDL/DQL.
insert into
statement for streaming ETLselect
statement for streaming data analyticsZeppelin supports 3 types of streaming data analytics:
Single mode is for the case when the result of sql statement is always one row, such as the following example. The output format is HTML, and you can specify paragraph local property template
for the final output content template. And you can use {i}
as placeholder for the ith column of result.
Update mode is suitable for the case when the output is more than one rows, and always will be updated continuously. Here’s one example where we use group by.
Append mode is suitable for the scenario where output data is always appended. E.g. the following example which use tumble window.
You can use Flink scala UDF or Python UDF in sql. UDF for batch and streaming sql is the same. Here're 2 examples.
%flink class ScalaUpper extends ScalarFunction { def eval(str: String) = str.toUpperCase } btenv.registerFunction("scala_upper", new ScalaUpper())
%flink.pyflink class PythonUpper(ScalarFunction): def eval(self, s): return s.upper() bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))
Zeppelin only supports scala and python for flink interpreter, if you want to write java udf or the udf is pretty complicated which make it not suitable to write in Zeppelin, then you can write the udf in IDE and build a udf jar. In Zeppelin you just need to specify flink.udf.jars
to this jar, and flink interpreter will detect all the udfs in this jar and register all the udfs to TableEnvironment, the udf name is the class name.
In order to use PyFlink in Zeppelin, you just need to do the following configuration.
zeppelin.pyflink.python
to the python executable where apache-flink is installed in case you have multiple python installed.And PyFlink will create 6 variables for you:
s_env
(StreamExecutionEnvironment),b_env
(ExecutionEnvironment)st_env
(StreamTableEnvironment for blink planner)bt_env
(BatchTableEnvironment for blink planner)st_env_2
(StreamTableEnvironment for flink planner)bt_env_2
(BatchTableEnvironment for flink planner)By default, zeppelin would use IPython in %flink.pyflink
when IPython is available, Otherwise it would fall back to the original python implementation. For the IPython features, you can refer docPython Interpreter
Zeppelin automatically injects ZeppelinContext
as variable z
in your Scala/Python environment. ZeppelinContext
provides some additional functions and utilities. See Zeppelin-Context for more details. You can use z
to display both flink DataSet and batch/stream table.
In the section of Streaming Data Visualization
, we demonstrate the different visualization type via paragraph local properties: type
. In this section, we will list and explain all the supported local properties in flink interpreter.
Zeppelin is shipped with several Flink tutorial notes which may be helpful for you. You check more features in the tutorial notes.