Apache Flink is an open source platform for distributed stream and batch data processing. Flinkās core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink also builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.
Here's a list of properties that could be configured to customize Flink interpreter.
Besides these properties, you can also configure any flink properties that will override the value in flink-conf.yaml
. For more information about Flink configuration, you can find it here.
By default, Flink interpreter run in local mode as the default value of flink.execution.mode
is local
. In local mode, Flink will launch one MiniCluster which include JobManager and TaskManagers in one JVM. But you can still customize the MiniCluster via the following properties:
local.number-taskmanage
This property specify how many TaskManagers in MiniCluster.taskmanager.numberOfTaskSlot
This property specify how many slots for each TaskManager. By default it is 1.If you want to run Flink in yarn mode, you have to set the following properties:
flink.execution.mode
to be yarn
HADOOP_CONF_DIR
must be specified either in zeppelin-env.sh
or in interpreter properties.You can also customize the yarn mode via the following properties:
flink.yarn.jm.memory
Memory of JobManagerflink.yarn.tm.memory
Memory of TaskManagerflink.yarn.tm.num
Number of TaskManagerflink.yarn.tm.slot
Slot number per TaskManagerflink.yarn.queue
Queue name of yarn appYou have to set query.proxy.ports
and query.server.ports
to be a port range otherwise it is impossible to launch multiple TaskManager in one machine.
If you want to run Flink in standalone mode, you have to set the following properties:
flink.execution.mode
to be remote
flink.execution.remote.host
to be the host name of JobManagerflink.execution.remote.port
to be the port of rest server of JobManagerZeppelin's Flink interpreter support 3 kinds of interpreter:
%flink
(FlinkScalaInterpreter, Run scala code)%flink.bsql
(FlinkBatchSqlInterpreter, Run flink batch sql)%flink.ssql
(FlinkStreamSqlInterpreter, Run flink stream sql)%flink
)FlinkScalaInterpreter allow user to run scala code in zeppelin. 4 variables are created for users:
Users can use these variables to run DataSet/DataStream/BatchTable/StreamTable related job.
e.g. The following code snippet use benv
to run a batch style WordCount
{% highlight scala %} %flink
val data = benv.fromElements(“hello world”, “hello flink”, “hello hadoop”) data.flatMap(line => line.split(“\s”)) .map(w => (w, 1)) .groupBy(0) .sum(1) .print() {% endhighlight %}
The following use senv
to run a stream style WordCount
{% highlight scala %} %flink
val data = senv.fromElements(“hello world”, “hello flink”, “hello hadoop”) data.flatMap(line => line.split(“\s”)) .map(w => (w, 1)) .keyBy(0) .sum(1) .print
senv.execute() {% endhighlight %}
%flink.bsql
)FlinkBatchSqlInterpreter
support to run sql to query tables registered in BatchTableEnvironment
(btenv).
e.g. We can query the wc
table which is registered in scala code.
{% highlight scala %} %flink
val data = senv.fromElements(“hello world”, “hello flink”, “hello hadoop”). flatMap(line => line.split(“\s”)). map(w => (w, 1))
btenv.registerOrReplaceBoundedStream(“wc”, data, 'word,'number)
{% endhighlight %}
{% highlight sql %}
%flink.bsql
select word, sum(number) as c from wc group by word
{% endhighlight %}
%flink.ssql
)Flink Interpreter also support stream sql via FlinkStreamSqlInterpreter(%flink.ssql
) and also visualize the streaming data.
Overall there're 3 kinds of streaming sql supported by %flink.ssql
:
This kind of sql only return one row of data, but this row will be updated continually. Usually this is used for tracking the aggregation result of some metrics. e.g. total page view, total transactions and etc. Regarding this kind of sql, you can visualize it via html. Here's one example which calculate the total page view and visualize it via html.
{% highlight sql %}
%flink.ssql(type=single, parallelism=1, refreshInterval=3000, template={1} until {0}, enableSavePoint=true, runWithSavePoint=true)
select max(rowtime), count(1) from log {% endhighlight %}
This kind of sql will return a fixed number of rows, but will be updated continually. Usually this is used for tracking the aggregation result of some metrics by some dimensions. e.g. total page view per page, total transaction per country and etc. Regarding this kind of sql, you can visualize it via the built-in visualization charts of Zeppelin, such as barchart, linechart and etc. Here's one example which calculate the total page view per page and visualize it via barchart.
{% highlight sql %} %flink.ssql(type=retract, refreshInterval=2000, parallelism=2, enableSavePoint=true, runWithSavePoint=true)
select url, count(1) as pv from log group by url {% endhighlight %}
This kind of sql will return a fixed number of rows regularly in timeseries. This is usually used for tracking metrics by time window. e.g. Here's one example which calculate the page view for each 5 seconds window.
{% highlight sql %} %flink.ssql(type=ts, refreshInterval=2000, enableSavePoint=false, runWithSavePoint=false, threshold=60000)
select TUMBLE_START(rowtime, INTERVAL ‘5’ SECOND) as start_time, url, count(1) as pv from log group by TUMBLE(rowtime, INTERVAL ‘5’ SECOND), url {% endhighlight %}
Here's a list of properties that you can use to customize Flink stream sql
tab
for code completionZEPPELIN_HOME/logs