This document describes how to use Kylin as a data source in Apache Flink;
There were several attempts to do this in Scala and JDBC, but none of them works:
We will try use CreateInput and JDBCInputFormat in batch mode and access via JDBC to Kylin. But it isn’t implemented in Scala, is only in Java MailList. This doc will go step by step solving these problems.
This can be out initial skeleton:
{% highlight Groff markup %} import org.apache.flink.api.scala._ val env = ExecutionEnvironment.getExecutionEnvironment val inputFormat = JDBCInputFormat.buildJDBCInputFormat() .setDrivername(“org.apache.kylin.jdbc.Driver”) .setDBUrl(“jdbc:kylin://172.17.0.2:7070/learn_kylin”) .setUsername(“ADMIN”) .setPassword(“KYLIN”) .setQuery(“select count(distinct seller_id) as sellers from kylin_sales group by part_dt order by part_dt”) .finish() val dataset =env.createInput(inputFormat) {% endhighlight %}
The first error is:
Add to Scala: {% highlight Groff markup %} import org.apache.flink.api.java.io.jdbc.JDBCInputFormat {% endhighlight %}
Next error is
We can solve dependencies (mvn repository: jdbc); Add this to your pom.xml: {% highlight Groff markup %} org.apache.flink flink-jdbc ${flink.version} {% endhighlight %}
Similar to previous point we need solve dependencies of Row Class (mvn repository: Table) :
In pom.xml add: {% highlight Groff markup %} org.apache.flink flink-table_2.10 ${flink.version} {% endhighlight %}
In Scala: {% highlight Groff markup %} import org.apache.flink.api.table.Row {% endhighlight %}
This is the new error to solve:
If check the code of JDBCInputFormat.java, we can see this new property (and mandatory) added on Apr 2016 by FLINK-3750 Manual JDBCInputFormat v1.2 in Java
Add the new Property: setRowTypeInfo
{% highlight Groff markup %} val inputFormat = JDBCInputFormat.buildJDBCInputFormat() .setDrivername(“org.apache.kylin.jdbc.Driver”) .setDBUrl(“jdbc:kylin://172.17.0.2:7070/learn_kylin”) .setUsername(“ADMIN”) .setPassword(“KYLIN”) .setQuery(“select count(distinct seller_id) as sellers from kylin_sales group by part_dt order by part_dt”) .setRowTypeInfo(DB_ROWTYPE) .finish() {% endhighlight %}
How can configure this property in Scala? In Attempt4, there is an incorrect solution
We can check the types using the intellisense:
Then we will need add more dependences; Add to scala:
{% highlight Groff markup %} import org.apache.flink.api.table.typeutils.RowTypeInfo import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} {% endhighlight %}
Create a Array or Seq of TypeInformation[ ]
Solution:
{% highlight Groff markup %} var stringColum: TypeInformation[String] = createTypeInformation[String] val DB_ROWTYPE = new RowTypeInfo(Seq(stringColum)) {% endhighlight %}
Need find the kylin-jdbc-x.x.x.jar and then expose to Flink
Find the Kylin JDBC jar
From Kylin Download choose Binary and the correct version of Kylin and HBase
Download & Unpack: in ./lib:
Make this JAR accessible to Flink
If you execute like service you need put this JAR in you Java class path using your .bashrc
Check the actual value:
Check the permission for this file (Must be accessible for you):
If you are executing from IDE, need add your class path manually:
On IntelliJ: >
>
>
The result, will be similar to:
It is related with Flink 4108 (MailList) and Timo Walther make a PR
If you are running Flink <= 1.2 you will need apply this path and make clean install
In the error message you have the problem and solution …. nice ;) ¡¡
The output must be similar to this, print the result of query by standard output:
Try with a multi-colum and multi-type query:
{% highlight Groff markup %} select part_dt, sum(price) as total_selled, count(distinct seller_id) as sellers from kylin_sales group by part_dt order by part_dt {% endhighlight %}
Need changes in DB_ROWTYPE:
And import lib of Java, to work with Data type of Java
The new result will be:
Check if your HBase and Kylin is working. Also you can use Kylin UI for it.
See Kylin 1898
It is a problem with kylin-jdbc-1.x.x. JAR, you need use Calcite 1.8 or above; The solution is to use Kylin 1.5.4 or above.
Is a problem with versions of scala, check in with “scala -version” your actual version and choose your correct POM.
Perhaps you will need a IntelliJ > File > Invalidates Cache > Invalidate and Restart.
I added POM for Scala 2.11
Now you can read Kylin’s data from Apache Flink, great!
Solved all integration problems, and tested with different types of data (Long, BigDecimal and Dates). The patch has been committed at 15 Oct, then, will be part of Flink 1.2.