| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| <!DOCTYPE html> |
| |
| <html lang="en"> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags --> |
| |
| <title>Apache Flink 0.9.0 Documentation: FlinkML - Looking under the hood of piplines</title> |
| |
| <link rel="shortcut icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon"> |
| |
| <!-- Bootstrap --> |
| <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css"> |
| <link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/flink.css"> |
| <link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/syntax.css"> |
| <link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/codetabs.css"> |
| |
| <script type="text/x-mathjax-config"> |
| MathJax.Hub.Config({ |
| tex2jax: { |
| inlineMath: [['$','$'], ['\\(','\\)']] }, |
| TeX: { |
| equationNumbers: { autoNumber: "AMS" } } |
| }); |
| </script> |
| <script type="text/javascript" |
| src="https://cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML"> |
| </script> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!-- WARNING: Respond.js doesn't work if you view the page via file:// --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| <body> |
| |
| |
| |
| |
| |
| |
| <!-- Top navbar. --> |
| <nav class="navbar navbar-default navbar-fixed-top"> |
| <div class="container"> |
| <!-- The logo. --> |
| <div class="navbar-header"> |
| <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| <div class="navbar-logo"> |
| <a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/0.9/page/img/navbar-brand-logo.jpg"></a> |
| </div> |
| </div><!-- /.navbar-header --> |
| |
| <!-- The navigation links. --> |
| <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1"> |
| <ul class="nav navbar-nav"> |
| <li><a href="http://flink.apache.org/docs/0.9/index.html">Overview<span class="hidden-sm hidden-xs"> 0.9.0</span></a></li> |
| |
| <!-- Setup --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/0.9/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/building.html">Get Flink 0.9-SNAPSHOT</a></li> |
| |
| <li class="divider"></li> |
| <li role="presentation" class="dropdown-header"><strong>Deployment</strong></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/local_setup.html" class="active">Local</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/cluster_setup.html">Cluster (Standalone)</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/yarn_setup.html">YARN</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/gce_setup.html">GCloud</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li> |
| |
| <li class="divider"></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/config.html">Configuration</a></li> |
| </ul> |
| </li> |
| |
| <!-- Programming Guides --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/0.9/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/python.html">Python API <span class="badge">Beta</span></a></li> |
| |
| <li class="divider"></li> |
| <li><a href="scala_shell.html">Interactive Scala Shell</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/dataset_transformations.html">Dataset Transformations</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/best_practices.html">Best Practices</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/example_connectors.html">Connectors</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/examples.html">Examples</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/local_execution.html">Local Execution</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/cluster_execution.html">Cluster Execution</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/cli.html">Command Line Interface</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/web_client.html">Web Client</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/iterations.html">Iterations</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/java8.html">Java 8</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li> |
| </ul> |
| </li> |
| |
| <!-- Libraries --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/0.9/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li><a href="http://flink.apache.org/docs/0.9/libs/spargel_guide.html">Graphs: Spargel</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li> |
| </ul> |
| </li> |
| |
| <!-- Internals --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/0.9/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li role="presentation" class="dropdown-header"><strong>Contribute</strong></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/how_to_contribute.html">How to Contribute</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/coding_guidelines.html">Coding Guidelines</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/ide_setup.html">IDE Setup</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/logging.html">Logging</a></li> |
| <li class="divider"></li> |
| <li role="presentation" class="dropdown-header"><strong>Internals</strong></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/general_arch.html">Architecture & Process Model</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/types_serialization.html">Type Extraction & Serialization</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/job_scheduling.html">Jobs & Scheduling</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/add_operator.html">How-To: Add an Operator</a></li> |
| </ul> |
| </li> |
| </ul> |
| <form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/0.9/search-results.html"> |
| <div class="form-group"> |
| <input type="text" class="form-control" name="q" placeholder="Search all pages"> |
| </div> |
| <button type="submit" class="btn btn-default">Search</button> |
| </form> |
| </div><!-- /.navbar-collapse --> |
| </div><!-- /.container --> |
| </nav> |
| |
| |
| |
| |
| |
| <!--Some of the Latex math notation has been adapted from Apache Spark MLlib's documentation--> |
| $$ |
| \newcommand{\R}{\mathbb{R}} |
| \newcommand{\E}{\mathbb{E}} |
| \newcommand{\x}{\mathbf{x}} |
| \newcommand{\y}{\mathbf{y}} |
| \newcommand{\wv}{\mathbf{w}} |
| \newcommand{\av}{\mathbf{\alpha}} |
| \newcommand{\bv}{\mathbf{b}} |
| \newcommand{\N}{\mathbb{N}} |
| \newcommand{\id}{\mathbf{I}} |
| \newcommand{\ind}{\mathbf{1}} |
| \newcommand{\0}{\mathbf{0}} |
| \newcommand{\unit}{\mathbf{e}} |
| \newcommand{\one}{\mathbf{1}} |
| \newcommand{\zero}{\mathbf{0}} |
| \newcommand\rfrac[2]{^{#1}\!/_{#2}} |
| \newcommand{\norm}[1]{\left\lVert#1\right\rVert} |
| $$ |
| |
| |
| <!-- Main content. --> |
| <div class="container"> |
| |
| |
| <div class="row"> |
| <div class="col-sm-10 col-sm-offset-1"> |
| <h1><a href="../ml">FlinkML</a> - Looking under the hood of pipelines</h1> |
| |
| |
| |
| <ul id="markdown-toc"> |
| <li><a href="#introduction" id="markdown-toc-introduction">Introduction</a></li> |
| <li><a href="#the-what-and-the-why" id="markdown-toc-the-what-and-the-why">The what and the why</a></li> |
| <li><a href="#pipelines-in-flinkml" id="markdown-toc-pipelines-in-flinkml">Pipelines in FlinkML</a> <ul> |
| <li><a href="#scala-implicits" id="markdown-toc-scala-implicits">Scala implicits</a></li> |
| <li><a href="#operations" id="markdown-toc-operations">Operations</a></li> |
| <li><a href="#chaining" id="markdown-toc-chaining">Chaining</a></li> |
| <li><a href="#how-to-implement-a-pipeline-operator" id="markdown-toc-how-to-implement-a-pipeline-operator">How to Implement a Pipeline Operator</a></li> |
| </ul> |
| </li> |
| </ul> |
| |
| <h2 id="introduction">Introduction</h2> |
| |
| <p>The ability to chain together different transformers and predictors is an important feature for |
| any Machine Learning (ML) library. In FlinkML we wanted to provide an intuitive API, |
| and at the same |
| time utilize the capabilities of the Scala language to provide |
| type-safe implementations of our pipelines. What we hope to achieve then is an easy to use API, |
| that protects users from type errors at pre-flight (before the job is launched) time, thereby |
| eliminating cases where long |
| running jobs are submitted to the cluster only to see them fail due to some |
| error in the series of data transformations that commonly happen in an ML pipeline.</p> |
| |
| <p>In this guide then we will describe the choices we made during the implementation of chainable |
| transformers and predictors in FlinkML, and provide guidelines on how developers can create their |
| own algorithms that make use of these capabilities.</p> |
| |
| <h2 id="the-what-and-the-why">The what and the why</h2> |
| |
| <p>So what do we mean by “ML pipelines”? Pipelines in the ML context can be thought of as chains of |
| operations that have some data as input, perform a number of transformations to that data, |
| and |
| then output the transformed data, either to be used as the input (features) of a predictor |
| function, such as a learning model, or just output the transformed data themselves, to be used in |
| some other task. The end learner can of course be a part of the pipeline as well. |
| ML pipelines can often be complicated sets of operations (<a href="http://research.google.com/pubs/pub43146.html">in-depth explanation</a>) and |
| can become sources of errors for end-to-end learning systems.</p> |
| |
| <p>The purpose of ML pipelines is then to create a |
| framework that can be used to manage the complexity introduced by these chains of operations. |
| Pipelines should make it easy for developers to define chained transformations that can be |
| applied to the |
| training data, in order to create the end features that will be used to train a |
| learning model, and then perform the same set of transformations just as easily to unlabeled |
| (test) data. Pipelines should also simplify cross-validation and model selection on |
| these chains of operations.</p> |
| |
| <p>Finally, by ensuring that the consecutive links in the pipeline chain “fit together” we also |
| avoid costly type errors. Since each step in a pipeline can be a computationally-heavy operation, |
| we want to avoid running a pipelined job, unless we are sure that all the input/output pairs in a |
| pipeline “fit”.</p> |
| |
| <h2 id="pipelines-in-flinkml">Pipelines in FlinkML</h2> |
| |
| <p>The building blocks for pipelines in FlinkML can be found in the <code>ml.pipeline</code> package. |
| FlinkML follows an API inspired by <a href="http://scikit-learn.org">sklearn</a> which means that we have |
| <code>Estimator</code>, <code>Transformer</code> and <code>Predictor</code> interfaces. For an in-depth look at the design of the |
| sklearn API the interested reader is referred to <a href="http://arxiv.org/abs/1309.0238">this</a> paper. |
| In short, the <code>Estimator</code> is the base class from which <code>Transformer</code> and <code>Predictor</code> inherit. |
| <code>Estimator</code> defines a <code>fit</code> method, and <code>Transformer</code> also defines a <code>transform</code> method and |
| <code>Predictor</code> defines a <code>predict</code> method.</p> |
| |
| <p>The <code>fit</code> method of the <code>Estimator</code> performs the actual training of the model, for example |
| finding the correct weights in a linear regression task, or the mean and standard deviation of |
| the data in a feature scaler. |
| As evident by the naming, classes that implement |
| <code>Transformer</code> are transform operations like <a href="standard_scaler.html">scaling the input</a> and |
| <code>Predictor</code> implementations are learning algorithms such as <a href="http://flink.apache.org/docs/0.9/libs/ml/multiple_linear_regression.html">Multiple Linear Regression</a>. |
| Pipelines can be created by chaining together a number of Transformers, and the final link in a pipeline can be a Predictor or another Transformer. |
| Pipelines that end with Predictor cannot be chained any further. |
| Below is an example of how a pipeline can be formed:</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Training data</span> |
| <span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">LabeledVector</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span> |
| <span class="c1">// Test data</span> |
| <span class="k">val</span> <span class="n">unlabeled</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Vector</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span> |
| |
| <span class="k">val</span> <span class="n">scaler</span> <span class="k">=</span> <span class="nc">StandardScaler</span><span class="o">()</span> |
| <span class="k">val</span> <span class="n">polyFeatures</span> <span class="k">=</span> <span class="nc">PolynomialFeatures</span><span class="o">()</span> |
| <span class="k">val</span> <span class="n">mlr</span> <span class="k">=</span> <span class="nc">MultipleLinearRegression</span><span class="o">()</span> |
| |
| <span class="c1">// Construct the pipeline</span> |
| <span class="k">val</span> <span class="n">pipeline</span> <span class="k">=</span> <span class="n">scaler</span> |
| <span class="o">.</span><span class="n">chainTransformer</span><span class="o">(</span><span class="n">polyFeatures</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">chainPredictor</span><span class="o">(</span><span class="n">mlr</span><span class="o">)</span> |
| |
| <span class="c1">// Train the pipeline (scaler and multiple linear regression)</span> |
| <span class="n">pipeline</span><span class="o">.</span><span class="n">fit</span><span class="o">(</span><span class="n">input</span><span class="o">)</span> |
| |
| <span class="c1">// Calculate predictions for the testing data</span> |
| <span class="k">val</span> <span class="n">predictions</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">LabeledVector</span><span class="o">]</span> <span class="k">=</span> <span class="n">pipeline</span><span class="o">.</span><span class="n">predict</span><span class="o">(</span><span class="n">unlabeled</span><span class="o">)</span></code></pre></div> |
| |
| <p>As we mentioned, FlinkML pipelines are type-safe. |
| If we tried to chain a transformer with output of type <code>A</code> to another with input of type <code>B</code> we |
| would get an error at pre-flight time if <code>A</code> != <code>B</code>. FlinkML achieves this kind of type-safety |
| through the use of Scala’s implicits.</p> |
| |
| <h3 id="scala-implicits">Scala implicits</h3> |
| |
| <p>If you are not familiar with Scala’s implicits we can recommend <a href="https://www.artima.com/pins1ed/implicit-conversions-and-parameters.html">this excerpt</a> |
| from Martin Odersky’s “Programming in Scala”. In short, implicit conversions allow for ad-hoc |
| polymorphism in Scala by providing conversions from one type to another, and implicit values |
| provide the compiler with default values that can be supplied to function calls through implicit parameters. |
| The combination of implicit conversions and implicit parameters is what allows us to chain transform |
| and predict operations together in a type-safe manner.</p> |
| |
| <h3 id="operations">Operations</h3> |
| |
| <p>As we mentioned, the trait (abstract class) <code>Estimator</code> defines a <code>fit</code> method. The method has two |
| parameter lists |
| (i.e. is a <a href="http://docs.scala-lang.org/tutorials/tour/currying.html">curried function</a>). The |
| first parameter list |
| takes the input (training) <code>DataSet</code> and the parameters for the estimator. The second parameter |
| list takes one <code>implicit</code> parameter, of type <code>FitOperation</code>. <code>FitOperation</code> is a class that also |
| defines a <code>fit</code> method, and this is where the actual logic of training the concrete Estimators |
| should be implemented. The <code>fit</code> method of <code>Estimator</code> is essentially a wrapper around the fit |
| method of <code>FitOperation</code>. The <code>predict</code> method of <code>Predictor</code> and the <code>transform</code> method of |
| <code>Transform</code> are designed in a similar manner, with a respective operation class.</p> |
| |
| <p>In these methods the operation object is provided as an implicit parameter. |
| Scala will <a href="http://docs.scala-lang.org/tutorials/FAQ/finding-implicits.html">look for implicits</a> |
| in the companion object of a type, so classes that implement these interfaces should provide these |
| objects as implicit objects inside the companion object.</p> |
| |
| <p>As an example we can look at the <code>StandardScaler</code> class. <code>StandardScaler</code> extends <code>Transformer</code>, so it has access to its <code>fit</code> and <code>transform</code> functions. |
| These two functions expect objects of <code>FitOperation</code> and <code>TransformOperation</code> as implicit parameters, |
| for the <code>fit</code> and <code>transform</code> methods respectively, which <code>StandardScaler</code> provides in its companion |
| object, through <code>transformVectors</code> and <code>fitVectorStandardScaler</code>:</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">StandardScaler</span> <span class="k">extends</span> <span class="nc">Transformer</span><span class="o">[</span><span class="kt">StandardScaler</span><span class="o">]</span> <span class="o">{</span> |
| <span class="o">...</span> |
| <span class="o">}</span> |
| |
| <span class="k">object</span> <span class="nc">StandardScaler</span> <span class="o">{</span> |
| |
| <span class="o">...</span> |
| |
| <span class="k">implicit</span> <span class="k">def</span> <span class="n">fitVectorStandardScaler</span><span class="o">[</span><span class="kt">T</span> <span class="k"><:</span> <span class="kt">Vector</span><span class="o">]</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">FitOperation</span><span class="o">[</span><span class="kt">StandardScaler</span>, <span class="kt">T</span><span class="o">]</span> <span class="o">{</span> |
| <span class="k">override</span> <span class="k">def</span> <span class="n">fit</span><span class="o">(</span><span class="n">instance</span><span class="k">:</span> <span class="kt">StandardScaler</span><span class="o">,</span> <span class="n">fitParameters</span><span class="k">:</span> <span class="kt">ParameterMap</span><span class="o">,</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">T</span><span class="o">])</span> |
| <span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="o">...</span> |
| <span class="o">}</span> |
| |
| <span class="k">implicit</span> <span class="k">def</span> <span class="n">transformVectors</span><span class="o">[</span><span class="kt">T</span> <span class="k"><:</span> <span class="kt">Vector:</span> <span class="kt">VectorConverter:</span> <span class="kt">TypeInformation:</span> <span class="kt">ClassTag</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span> |
| <span class="k">new</span> <span class="nc">TransformOperation</span><span class="o">[</span><span class="kt">StandardScaler</span>, <span class="kt">T</span>, <span class="kt">T</span><span class="o">]</span> <span class="o">{</span> |
| <span class="k">override</span> <span class="k">def</span> <span class="n">transform</span><span class="o">(</span> |
| <span class="n">instance</span><span class="k">:</span> <span class="kt">StandardScaler</span><span class="o">,</span> |
| <span class="n">transformParameters</span><span class="k">:</span> <span class="kt">ParameterMap</span><span class="o">,</span> |
| <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">T</span><span class="o">])</span> |
| <span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">T</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span> |
| <span class="o">...</span> |
| <span class="o">}</span> |
| |
| <span class="o">}</span></code></pre></div> |
| |
| <p>Note that <code>StandardScaler</code> does <strong>not</strong> override the <code>fit</code> method of <code>Estimator</code> or the <code>transform</code> |
| method of <code>Transformer</code>. Rather, its implementations of <code>FitOperation</code> and <code>TransformOperation</code> |
| override their respective <code>fit</code> and <code>transform</code> methods, which are then called by the <code>fit</code> and |
| <code>transform</code> methods of <code>Estimator</code> and <code>Transformer</code>. Similarly, a class that implements |
| <code>Predictor</code> should define an implicit <code>PredictOperation</code> object inside its companion object.</p> |
| |
| <h4 id="types-and-type-safety">Types and type safety</h4> |
| |
| <p>Apart from the <code>fit</code> and <code>transform</code> operations that we listed above, the <code>StandardScaler</code> also |
| provides <code>fit</code> and <code>transform</code> operations for input of type <code>LabeledVector</code>. |
| This allows us to use the algorithm for input that is labeled or unlabeled, and this happens |
| automatically, depending on the type of the input that we give to the fit and transform |
| operations. The correct implicit operation is chosen by the compiler, depending on the input type.</p> |
| |
| <p>If we try to call the <code>fit</code> or <code>transform</code> methods with types that are not supported we will get a |
| runtime error before the job is launched. |
| While it would be possible to catch these kinds of errors at compile time as well, the error |
| messages that we are able to provide the user would be much less informative, which is why we chose |
| to throw runtime exceptions instead.</p> |
| |
| <h3 id="chaining">Chaining</h3> |
| |
| <p>Chaining is achieved by calling <code>chainTransformer</code> or <code>chainPredictor</code> on an object |
| of a class that implements <code>Transformer</code>. These methods return a <code>ChainedTransformer</code> or |
| <code>ChainedPredictor</code> object respectively. As we mentioned, <code>ChainedTransformer</code> objects can be |
| chained further, while <code>ChainedPredictor</code> objects cannot. These classes take care of applying |
| fit, transform, and predict operations for a pair of successive transformers or |
| a transformer and a predictor. They also act recursively if the length of the |
| chain is larger than two, since every <code>ChainedTransformer</code> defines a <code>transform</code> and <code>fit</code> |
| operation that can be further chained with more transformers or a predictor.</p> |
| |
| <p>It is important to note that developers and users do not need to worry about chaining when |
| implementing their algorithms, all this is handled automatically by FlinkML.</p> |
| |
| <h3 id="how-to-implement-a-pipeline-operator">How to Implement a Pipeline Operator</h3> |
| |
| <p>In order to support FlinkML’s pipelining, algorithms have to adhere to a certain design pattern, which we will describe in this section. |
| Let’s assume that we want to implement a pipeline operator which changes the mean of your data. |
| Since centering data is a common pre-processing step in many analysis pipelines, we will implement it as a <code>Transformer</code>. |
| Therefore, we first create a <code>MeanTransformer</code> class which inherits from <code>Transformer</code></p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">MeanTransformer</span> <span class="k">extends</span> <span class="nc">Transformer</span><span class="o">[</span><span class="kt">MeanTransformer</span><span class="o">]</span> <span class="o">{}</span></code></pre></div> |
| |
| <p>Since we want to be able to configure the mean of the resulting data, we have to add a configuration parameter.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">MeanTransformer</span> <span class="k">extends</span> <span class="nc">Transformer</span><span class="o">[</span><span class="kt">MeanTransformer</span><span class="o">]</span> <span class="o">{</span> |
| <span class="k">def</span> <span class="n">setMean</span><span class="o">(</span><span class="n">mean</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span><span class="k">:</span> <span class="kt">this.</span><span class="k">type</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="n">parameters</span><span class="o">.</span><span class="n">add</span><span class="o">(</span><span class="nc">MeanTransformer</span><span class="o">.</span><span class="nc">Mean</span><span class="o">,</span> <span class="n">mean</span><span class="o">)</span> |
| <span class="k">this</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| |
| <span class="k">object</span> <span class="nc">MeanTransformer</span> <span class="o">{</span> |
| <span class="k">case</span> <span class="k">object</span> <span class="nc">Mean</span> <span class="k">extends</span> <span class="nc">Parameter</span><span class="o">[</span><span class="kt">Double</span><span class="o">]</span> <span class="o">{</span> |
| <span class="k">override</span> <span class="k">val</span> <span class="n">defaultValue</span><span class="k">:</span> <span class="kt">Option</span><span class="o">[</span><span class="kt">Double</span><span class="o">]</span> <span class="k">=</span> <span class="nc">Some</span><span class="o">(</span><span class="mf">0.0</span><span class="o">)</span> |
| <span class="o">}</span> |
| |
| <span class="k">def</span> <span class="n">apply</span><span class="o">()</span><span class="k">:</span> <span class="kt">MeanTransformer</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">MeanTransformer</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <p>Parameters are defined in the companion object of the transformer class and extend the <code>Parameter</code> class. |
| Since the parameter instances are supposed to act as immutable keys for a parameter map, they should be implemented as <code>case objects</code>. |
| The default value will be used if no other value has been set by the user of this component. |
| If no default value has been specified, meaning that <code>defaultValue = None</code>, then the algorithm has to handle this situation accordingly.</p> |
| |
| <p>We can now instantiate a <code>MeanTransformer</code> object and set the mean value of the transformed data. |
| But we still have to implement how the transformation works. |
| The workflow can be separated into two phases. |
| Within the first phase, the transformer learns the mean of the given training data. |
| This knowledge can then be used in the second phase to transform the provided data with respect to the configured resulting mean value.</p> |
| |
| <p>The learning of the mean can be implemented within the <code>fit</code> operation of our <code>Transformer</code>, which it inherited from <code>Estimator</code>. |
| Within the <code>fit</code> operation, a pipeline component is trained with respect to the given training data. |
| The algorithm is, however, <strong>not</strong> implemented by overriding the <code>fit</code> method but by providing an implementation of a corresponding <code>FitOperation</code> for the correct type. |
| Taking a look at the definition of the <code>fit</code> method in <code>Estimator</code>, which is the parent class of <code>Transformer</code>, reveals what why this is the case.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">trait</span> <span class="nc">Estimator</span><span class="o">[</span><span class="kt">Self</span><span class="o">]</span> <span class="nc">extends</span> <span class="nc">WithParameters</span> <span class="k">with</span> <span class="nc">Serializable</span> <span class="o">{</span> |
| <span class="n">that</span><span class="k">:</span> <span class="kt">Self</span> <span class="o">=></span> |
| |
| <span class="k">def</span> <span class="n">fit</span><span class="o">[</span><span class="kt">Training</span><span class="o">](</span> |
| <span class="n">training</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Training</span><span class="o">],</span> |
| <span class="n">fitParameters</span><span class="k">:</span> <span class="kt">ParameterMap</span> <span class="o">=</span> <span class="nc">ParameterMap</span><span class="o">.</span><span class="nc">Empty</span><span class="o">)</span> |
| <span class="o">(</span><span class="k">implicit</span> <span class="n">fitOperation</span><span class="k">:</span> <span class="kt">FitOperation</span><span class="o">[</span><span class="kt">Self</span>, <span class="kt">Training</span><span class="o">])</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="nc">FlinkMLTools</span><span class="o">.</span><span class="n">registerFlinkMLTypes</span><span class="o">(</span><span class="n">training</span><span class="o">.</span><span class="n">getExecutionEnvironment</span><span class="o">)</span> |
| <span class="n">fitOperation</span><span class="o">.</span><span class="n">fit</span><span class="o">(</span><span class="k">this</span><span class="o">,</span> <span class="n">fitParameters</span><span class="o">,</span> <span class="n">training</span><span class="o">)</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <p>We see that the <code>fit</code> method is called with an input data set of type <code>Training</code>, an optional parameter list and in the second parameter list with an implicit parameter of type <code>FitOperation</code>. |
| Within the body of the function, first some machine learning types are registered and then the <code>fit</code> method of the <code>FitOperation</code> parameter is called. |
| The instance gives itself, the parameter map and the training data set as a parameters to the method. |
| Thus, all the program logic takes place within the <code>FitOperation</code>.</p> |
| |
| <p>The <code>FitOperation</code> has two type parameters. |
| The first defines the pipeline operator type for which this <code>FitOperation</code> shall work and the second type parameter defines the type of the data set elements. |
| If we first wanted to implement the <code>MeanTransformer</code> to work on <code>DenseVector</code>, we would, thus, have to provide an implementation for <code>FitOperation[MeanTransformer, DenseVector]</code>.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">denseVectorMeanFitOperation</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">FitOperation</span><span class="o">[</span><span class="kt">MeanTransformer</span>, <span class="kt">DenseVector</span><span class="o">]</span> <span class="o">{</span> |
| <span class="k">override</span> <span class="k">def</span> <span class="n">fit</span><span class="o">(</span><span class="n">instance</span><span class="k">:</span> <span class="kt">MeanTransformer</span><span class="o">,</span> <span class="n">fitParameters</span><span class="k">:</span> <span class="kt">ParameterMap</span><span class="o">,</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">])</span> <span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="k">import</span> <span class="nn">org.apache.flink.ml.math.Breeze._</span> |
| <span class="k">val</span> <span class="n">meanTrainingData</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">]</span> <span class="k">=</span> <span class="n">input</span> |
| <span class="o">.</span><span class="n">map</span><span class="o">{</span> <span class="n">x</span> <span class="k">=></span> <span class="o">(</span><span class="n">x</span><span class="o">.</span><span class="n">asBreeze</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> <span class="o">}</span> |
| <span class="o">.</span><span class="n">reduce</span><span class="o">{</span> |
| <span class="o">(</span><span class="n">left</span><span class="o">,</span> <span class="n">right</span><span class="o">)</span> <span class="k">=></span> |
| <span class="o">(</span><span class="n">left</span><span class="o">.</span><span class="n">_1</span> <span class="o">+</span> <span class="n">right</span><span class="o">.</span><span class="n">_1</span><span class="o">,</span> <span class="n">left</span><span class="o">.</span><span class="n">_2</span> <span class="o">+</span> <span class="n">right</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span> |
| <span class="o">}</span> |
| <span class="o">.</span><span class="n">map</span><span class="o">{</span> <span class="n">p</span> <span class="k">=></span> <span class="o">(</span><span class="n">p</span><span class="o">.</span><span class="n">_1</span><span class="o">/</span><span class="n">p</span><span class="o">.</span><span class="n">_2</span><span class="o">).</span><span class="n">fromBreeze</span> <span class="o">}</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <p>A <code>FitOperation[T, I]</code> has a <code>fit</code> method which is called with an instance of type <code>T</code>, a parameter map and an input <code>DataSet[I]</code>. |
| In our case <code>T=MeanTransformer</code> and <code>I=DenseVector</code>. |
| The parameter map is necessary if our fit step depends on some parameter values which were not given directly at creation time of the <code>Transformer</code>. |
| The <code>FitOperation</code> of the <code>MeanTransformer</code> sums the <code>DenseVector</code> instances of the given input data set up and divides the result by the total number of vectors. |
| That way, we obtain a <code>DataSet[DenseVector]</code> with a single element which is the mean value.</p> |
| |
| <p>But if we look closely at the implementation, we see that the result of the mean computation is never stored anywhere. |
| If we want to use this knowledge in a later step to adjust the mean of some other input, we have to keep it around. |
| And here is where the parameter of type <code>MeanTransformer</code> which is given to the <code>fit</code> method comes into play. |
| We can use this instance to store state, which is used by a subsequent <code>transform</code> operation which works on the same object. |
| But first we have to extend <code>MeanTransformer</code> by a member field and then adjust the <code>FitOperation</code> implementation.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">MeanTransformer</span> <span class="k">extends</span> <span class="nc">Transformer</span><span class="o">[</span><span class="kt">Centering</span><span class="o">]</span> <span class="o">{</span> |
| <span class="k">var</span> <span class="n">meanOption</span><span class="k">:</span> <span class="kt">Option</span><span class="o">[</span><span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">]]</span> <span class="k">=</span> <span class="nc">None</span> |
| |
| <span class="k">def</span> <span class="n">setMean</span><span class="o">(</span><span class="n">mean</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span><span class="k">:</span> <span class="kt">Mean</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="n">parameters</span><span class="o">.</span><span class="n">add</span><span class="o">(</span><span class="nc">MeanTransformer</span><span class="o">.</span><span class="nc">Mean</span><span class="o">,</span> <span class="n">mu</span><span class="o">)</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| |
| <span class="k">val</span> <span class="n">denseVectorMeanFitOperation</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">FitOperation</span><span class="o">[</span><span class="kt">MeanTransformer</span>, <span class="kt">DenseVector</span><span class="o">]</span> <span class="o">{</span> |
| <span class="k">override</span> <span class="k">def</span> <span class="n">fit</span><span class="o">(</span><span class="n">instance</span><span class="k">:</span> <span class="kt">MeanTransformer</span><span class="o">,</span> <span class="n">fitParameters</span><span class="k">:</span> <span class="kt">ParameterMap</span><span class="o">,</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">])</span> <span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="k">import</span> <span class="nn">org.apache.flink.ml.math.Breeze._</span> |
| |
| <span class="n">instance</span><span class="o">.</span><span class="n">meanOption</span> <span class="k">=</span> <span class="nc">Some</span><span class="o">(</span><span class="n">input</span> |
| <span class="o">.</span><span class="n">map</span><span class="o">{</span> <span class="n">x</span> <span class="k">=></span> <span class="o">(</span><span class="n">x</span><span class="o">.</span><span class="n">asBreeze</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> <span class="o">}</span> |
| <span class="o">.</span><span class="n">reduce</span><span class="o">{</span> |
| <span class="o">(</span><span class="n">left</span><span class="o">,</span> <span class="n">right</span><span class="o">)</span> <span class="k">=></span> |
| <span class="o">(</span><span class="n">left</span><span class="o">.</span><span class="n">_1</span> <span class="o">+</span> <span class="n">right</span><span class="o">.</span><span class="n">_1</span><span class="o">,</span> <span class="n">left</span><span class="o">.</span><span class="n">_2</span> <span class="o">+</span> <span class="n">right</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span> |
| <span class="o">}</span> |
| <span class="o">.</span><span class="n">map</span><span class="o">{</span> <span class="n">p</span> <span class="k">=></span> <span class="o">(</span><span class="n">p</span><span class="o">.</span><span class="n">_1</span><span class="o">/</span><span class="n">p</span><span class="o">.</span><span class="n">_2</span><span class="o">).</span><span class="n">fromBreeze</span> <span class="o">})</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <p>If we look at the <code>transform</code> method in <code>Transformer</code>, we will see that we also need an implementation of <code>TransformOperation</code>. |
| A possible mean transforming implementation could look like the following.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">denseVectorMeanTransformOperation</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">TransformOperation</span><span class="o">[</span><span class="kt">MeanTransformer</span>, <span class="kt">DenseVector</span>, <span class="kt">DenseVector</span><span class="o">]</span> <span class="o">{</span> |
| <span class="k">override</span> <span class="k">def</span> <span class="n">transform</span><span class="o">(</span> |
| <span class="n">instance</span><span class="k">:</span> <span class="kt">MeanTransformer</span><span class="o">,</span> |
| <span class="n">transformParameters</span><span class="k">:</span> <span class="kt">ParameterMap</span><span class="o">,</span> |
| <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">])</span> |
| <span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span> |
| <span class="k">val</span> <span class="n">resultingParameters</span> <span class="k">=</span> <span class="n">parameters</span> <span class="o">++</span> <span class="n">transformParameters</span> |
| |
| <span class="k">val</span> <span class="n">resultingMean</span> <span class="k">=</span> <span class="n">resultingParameters</span><span class="o">(</span><span class="nc">MeanTransformer</span><span class="o">.</span><span class="nc">Mean</span><span class="o">)</span> |
| |
| <span class="n">instance</span><span class="o">.</span><span class="n">meanOption</span> <span class="k">match</span> <span class="o">{</span> |
| <span class="k">case</span> <span class="nc">Some</span><span class="o">(</span><span class="n">trainingMean</span><span class="o">)</span> <span class="k">=></span> <span class="o">{</span> |
| <span class="n">input</span><span class="o">.</span><span class="n">map</span><span class="o">{</span> <span class="k">new</span> <span class="nc">MeanTransformMapper</span><span class="o">(</span><span class="n">resultingMean</span><span class="o">)</span> <span class="o">}.</span><span class="n">withBroadcastSet</span><span class="o">(</span><span class="n">trainingMean</span><span class="o">,</span> <span class="s">"trainingMean"</span><span class="o">)</span> |
| <span class="o">}</span> |
| <span class="k">case</span> <span class="nc">None</span> <span class="k">=></span> <span class="k">throw</span> <span class="k">new</span> <span class="nc">RuntimeException</span><span class="o">(</span><span class="s">"MeanTransformer has not been fitted to data."</span><span class="o">)</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| |
| <span class="k">class</span> <span class="nc">MeanTransformMapper</span><span class="o">(</span><span class="n">resultingMean</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">RichMapFunction</span><span class="o">[</span><span class="kt">DenseVector</span>, <span class="kt">DenseVector</span><span class="o">]</span> <span class="o">{</span> |
| <span class="k">var</span> <span class="n">trainingMean</span><span class="k">:</span> <span class="kt">DenseVector</span> <span class="o">=</span> <span class="kc">null</span> |
| |
| <span class="k">override</span> <span class="k">def</span> <span class="n">open</span><span class="o">(</span><span class="n">parameters</span><span class="k">:</span> <span class="kt">Configuration</span><span class="o">)</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="n">trainingMean</span> <span class="k">=</span> <span class="n">getRuntimeContext</span><span class="o">().</span><span class="n">getBroadcastVariable</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">](</span><span class="s">"trainingMean"</span><span class="o">).</span><span class="n">get</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> |
| <span class="o">}</span> |
| |
| <span class="k">override</span> <span class="k">def</span> <span class="n">map</span><span class="o">(</span><span class="n">vector</span><span class="k">:</span> <span class="kt">DenseVector</span><span class="o">)</span><span class="k">:</span> <span class="kt">DenseVector</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="k">import</span> <span class="nn">org.apache.flink.ml.math.Breeze._</span> |
| |
| <span class="k">val</span> <span class="n">result</span> <span class="k">=</span> <span class="n">vector</span><span class="o">.</span><span class="n">asBreeze</span> <span class="o">-</span> <span class="n">trainingMean</span><span class="o">.</span><span class="n">asBreeze</span> <span class="o">+</span> <span class="n">resultingMean</span> |
| |
| <span class="n">result</span><span class="o">.</span><span class="n">fromBreeze</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <p>Now we have everything implemented to fit our <code>MeanTransformer</code> to a training data set of <code>DenseVector</code> instances and to transform them. |
| However, when we execute the <code>fit</code> operation</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">trainingData</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span> |
| <span class="k">val</span> <span class="n">meanTransformer</span> <span class="k">=</span> <span class="nc">MeanTransformer</span><span class="o">()</span> |
| |
| <span class="n">meanTransformer</span><span class="o">.</span><span class="n">fit</span><span class="o">(</span><span class="n">trainingData</span><span class="o">)</span></code></pre></div> |
| |
| <p>we receive the following error at runtime: <code>"There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.math.DenseVector]"</code>. |
| The reason is that the Scala compiler could not find a fitting <code>FitOperation</code> value with the right type parameters for the implicit parameter of the <code>fit</code> method. |
| Therefore, it chose a fallback implicit value which gives you this error message at runtime. |
| In order to make the compiler aware of our implementation, we have to define it as an implicit value and put it in the scope of the <code>MeanTransformer's</code> companion object.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">object</span> <span class="nc">MeanTransformer</span><span class="o">{</span> |
| <span class="k">implicit</span> <span class="k">val</span> <span class="n">denseVectorMeanFitOperation</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">FitOperation</span><span class="o">[</span><span class="kt">MeanTransformer</span>, <span class="kt">DenseVector</span><span class="o">]</span> <span class="o">...</span> |
| |
| <span class="k">implicit</span> <span class="k">val</span> <span class="n">denseVectorMeanTransformOperation</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">TransformOperation</span><span class="o">[</span><span class="kt">MeanTransformer</span>, <span class="kt">DenseVector</span>, <span class="kt">DenseVector</span><span class="o">]</span> <span class="o">...</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <p>Now we can call <code>fit</code> and <code>transform</code> of our <code>MeanTransformer</code> with <code>DataSet[DenseVector]</code> as input. |
| Furthermore, we can now use this transformer as part of an analysis pipeline where we have a <code>DenseVector</code> as input and expected output.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">trainingData</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span> |
| |
| <span class="k">val</span> <span class="n">mean</span> <span class="k">=</span> <span class="nc">MeanTransformer</span><span class="o">.</span><span class="n">setMean</span><span class="o">(</span><span class="mf">1.0</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">polyFeaturs</span> <span class="k">=</span> <span class="nc">PolynomialFeatures</span><span class="o">().</span><span class="n">setDegree</span><span class="o">(</span><span class="mi">3</span><span class="o">)</span> |
| |
| <span class="k">val</span> <span class="n">pipeline</span> <span class="k">=</span> <span class="n">mean</span><span class="o">.</span><span class="n">chainTransformer</span><span class="o">(</span><span class="n">polyFeatures</span><span class="o">)</span> |
| |
| <span class="n">pipeline</span><span class="o">.</span><span class="n">fit</span><span class="o">(</span><span class="n">trainingData</span><span class="o">)</span></code></pre></div> |
| |
| <p>It is noteworthy that there is no additional code needed to enable chaining. |
| The system automatically constructs the pipeline logic using the operations of the individual components.</p> |
| |
| <p>So far everything works fine with <code>DenseVector</code>. |
| But what happens, if we call our transformer with <code>LabeledVector</code> instead?</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">trainingData</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">LabeledVector</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span> |
| |
| <span class="k">val</span> <span class="n">mean</span> <span class="k">=</span> <span class="nc">MeanTransformer</span><span class="o">()</span> |
| |
| <span class="n">mean</span><span class="o">.</span><span class="n">fit</span><span class="o">(</span><span class="n">trainingData</span><span class="o">)</span></code></pre></div> |
| |
| <p>As before we see the following exception upon execution of the program: <code>"There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.common.LabeledVector]"</code>. |
| It is noteworthy, that this exception is thrown in the pre-flight phase, which means that the job has not been submitted to the runtime system. |
| This has the advantage that you won’t see a job which runs for a couple of days and then fails because of an incompatible pipeline component. |
| Type compatibility is, thus, checked at the very beginning for the complete job.</p> |
| |
| <p>In order to make the <code>MeanTransformer</code> work on <code>LabeledVector</code> as well, we have to provide the corresponding operations. |
| Consequently, we have to define a <code>FitOperation[MeanTransformer, LabeledVector]</code> and <code>TransformOperation[MeanTransformer, LabeledVector, LabeledVector]</code> as implicit values in the scope of <code>MeanTransformer</code>’s companion object.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">object</span> <span class="nc">MeanTransformer</span> <span class="o">{</span> |
| <span class="k">implicit</span> <span class="k">val</span> <span class="n">labeledVectorFitOperation</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">FitOperation</span><span class="o">[</span><span class="kt">MeanTransformer</span>, <span class="kt">LabeledVector</span><span class="o">]</span> <span class="o">...</span> |
| |
| <span class="k">implicit</span> <span class="k">val</span> <span class="n">labeledVectorTransformOperation</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">TransformOperation</span><span class="o">[</span><span class="kt">MeanTransformer</span>, <span class="kt">LabeledVector</span>, <span class="kt">LabeledVector</span><span class="o">]</span> <span class="o">...</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <p>If we wanted to implement a <code>Predictor</code> instead of a <code>Transformer</code>, then we would have to provide a <code>FitOperation</code>, too. |
| Moreover, a <code>Predictor</code> requires a <code>PredictOperation</code> which implements how predictions are calculated from testing data.</p> |
| |
| |
| </div> |
| |
| <div class="col-sm-10 col-sm-offset-1"> |
| <!-- Disqus thread and some vertical offset --> |
| <div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div> |
| </div> |
| </div> |
| |
| </div><!-- /.container --> |
| |
| <!-- jQuery (necessary for Bootstrap's JavaScript plugins) --> |
| <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script> |
| <!-- Include all compiled plugins (below), or include individual files as needed --> |
| <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script> |
| <script src="http://flink.apache.org/docs/0.9/page/js/codetabs.js"></script> |
| |
| <!-- Google Analytics --> |
| <script> |
| (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){ |
| (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o), |
| m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m) |
| })(window,document,'script','//www.google-analytics.com/analytics.js','ga'); |
| |
| ga('create', 'UA-52545728-1', 'auto'); |
| ga('send', 'pageview'); |
| </script> |
| |
| <!-- Disqus --> |
| <script type="text/javascript"> |
| var disqus_shortname = 'stratosphere-eu'; |
| (function() { |
| var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true; |
| dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js'; |
| (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq); |
| })(); |
| </script> |
| </body> |
| </html> |