title: “FlinkML - Machine Learning for Flink” nav-id: ml nav-show_overview: true nav-title: Machine Learning nav-parent_id: libs nav-pos: 4

FlinkML is the Machine Learning (ML) library for Flink. It is a new effort in the Flink community, with a growing list of algorithms and contributors. With FlinkML we aim to provide scalable ML algorithms, an intuitive API, and tools that help minimize glue code in end-to-end ML systems. You can see more details about our goals and where the library is headed in our vision and roadmap here.

  • This will be replaced by the TOC {:toc}

Supported Algorithms

FlinkML currently supports the following algorithms:

Supervised Learning

Unsupervised Learning

Data Preprocessing

Recommendation

Outlier selection

Utilities

Getting Started

You can check out our quickstart guide for a comprehensive getting started example.

If you want to jump right in, you have to [set up a Flink program]({{ site.baseurl }}/dev/linking_with_flink.html). Next, you have to add the FlinkML dependency to the pom.xml of your project.

{% highlight xml %} com.alibaba.blink flink-ml{{ site.scala_version_suffix }} {{site.version }} {% endhighlight %}

Note that FlinkML is currently not part of the binary distribution. See linking with it for cluster execution here.

Now you can start solving your analysis task. The following code snippet shows how easy it is to train a multiple linear regression model.

{% highlight scala %} // LabeledVector is a feature vector with a label (class or real value) val trainingData: DataSet[LabeledVector] = ... val testingData: DataSet[Vector] = ...

// Alternatively, a Splitter is used to break up a DataSet into training and testing data. val dataSet: DataSet[LabeledVector] = ... val trainTestData: DataSet[TrainTestDataSet] = Splitter.trainTestSplit(dataSet) val trainingData: DataSet[LabeledVector] = trainTestData.training val testingData: DataSet[Vector] = trainTestData.testing.map(lv => lv.vector)

val mlr = MultipleLinearRegression() .setStepsize(1.0) .setIterations(100) .setConvergenceThreshold(0.001)

mlr.fit(trainingData)

// The fitted model can now be used to make predictions val predictions: DataSet[LabeledVector] = mlr.predict(testingData) {% endhighlight %}

Pipelines

A key concept of FlinkML is its scikit-learn inspired pipelining mechanism. It allows you to quickly build complex data analysis pipelines how they appear in every data scientist‘s daily work. An in-depth description of FlinkML’s pipelines and their internal workings can be found here.

The following example code shows how easy it is to set up an analysis pipeline with FlinkML.

{% highlight scala %} val trainingData: DataSet[LabeledVector] = ... val testingData: DataSet[Vector] = ...

val scaler = StandardScaler() val polyFeatures = PolynomialFeatures().setDegree(3) val mlr = MultipleLinearRegression()

// Construct pipeline of standard scaler, polynomial features and multiple linear regression val pipeline = scaler.chainTransformer(polyFeatures).chainPredictor(mlr)

// Train pipeline pipeline.fit(trainingData)

// Calculate predictions val predictions: DataSet[LabeledVector] = pipeline.predict(testingData) {% endhighlight %}

One can chain a Transformer to another Transformer or a set of chained Transformers by calling the method chainTransformer. If one wants to chain a Predictor to a Transformer or a set of chained Transformers, one has to call the method chainPredictor.

How to contribute

The Flink community welcomes all contributors who want to get involved in the development of Flink and its libraries. In order to get quickly started with contributing to FlinkML, please read our official contribution guide.