blob: c31f402e5ffdcc7518f8cecf67eea38b607970d5 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink 0.9.0 Documentation: FlinkML - Looking under the hood of piplines</title>
<link rel="shortcut icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon">
<link rel="icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/flink.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/syntax.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/codetabs.css">
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
tex2jax: {
inlineMath: [['$','$'], ['\\(','\\)']] },
TeX: {
equationNumbers: { autoNumber: "AMS" } }
});
</script>
<script type="text/javascript"
src="https://cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML">
</script>
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Top navbar. -->
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/0.9/page/img/navbar-brand-logo.jpg"></a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<li><a href="http://flink.apache.org/docs/0.9/index.html">Overview<span class="hidden-sm hidden-xs"> 0.9.0</span></a></li>
<!-- Setup -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/0.9/setup/building.html">Get Flink 0.9-SNAPSHOT</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Deployment</strong></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/local_setup.html" class="active">Local</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/cluster_setup.html">Cluster (Standalone)</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/yarn_setup.html">YARN</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/gce_setup.html">GCloud</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/config.html">Configuration</a></li>
</ul>
</li>
<!-- Programming Guides -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/0.9/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/python.html">Python API <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="scala_shell.html">Interactive Scala Shell</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/dataset_transformations.html">Dataset Transformations</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/best_practices.html">Best Practices</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/example_connectors.html">Connectors</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/examples.html">Examples</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/local_execution.html">Local Execution</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/cluster_execution.html">Cluster Execution</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/cli.html">Command Line Interface</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/web_client.html">Web Client</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/iterations.html">Iterations</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/java8.html">Java 8</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Libraries -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/0.9/libs/spargel_guide.html">Graphs: Spargel</a></li>
<li><a href="http://flink.apache.org/docs/0.9/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/0.9/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/0.9/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Internals -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/how_to_contribute.html">How to Contribute</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/coding_guidelines.html">Coding Guidelines</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/ide_setup.html">IDE Setup</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/logging.html">Logging</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Internals</strong></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/general_arch.html">Architecture &amp; Process Model</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/types_serialization.html">Type Extraction &amp; Serialization</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/job_scheduling.html">Jobs &amp; Scheduling</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/add_operator.html">How-To: Add an Operator</a></li>
</ul>
</li>
</ul>
<form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/0.9/search-results.html">
<div class="form-group">
<input type="text" class="form-control" name="q" placeholder="Search all pages">
</div>
<button type="submit" class="btn btn-default">Search</button>
</form>
</div><!-- /.navbar-collapse -->
</div><!-- /.container -->
</nav>
<!--Some of the Latex math notation has been adapted from Apache Spark MLlib's documentation-->
$$
\newcommand{\R}{\mathbb{R}}
\newcommand{\E}{\mathbb{E}}
\newcommand{\x}{\mathbf{x}}
\newcommand{\y}{\mathbf{y}}
\newcommand{\wv}{\mathbf{w}}
\newcommand{\av}{\mathbf{\alpha}}
\newcommand{\bv}{\mathbf{b}}
\newcommand{\N}{\mathbb{N}}
\newcommand{\id}{\mathbf{I}}
\newcommand{\ind}{\mathbf{1}}
\newcommand{\0}{\mathbf{0}}
\newcommand{\unit}{\mathbf{e}}
\newcommand{\one}{\mathbf{1}}
\newcommand{\zero}{\mathbf{0}}
\newcommand\rfrac[2]{^{#1}\!/_{#2}}
\newcommand{\norm}[1]{\left\lVert#1\right\rVert}
$$
<!-- Main content. -->
<div class="container">
<div class="row">
<div class="col-sm-10 col-sm-offset-1">
<h1><a href="../ml">FlinkML</a> - Looking under the hood of pipelines</h1>
<ul id="markdown-toc">
<li><a href="#introduction" id="markdown-toc-introduction">Introduction</a></li>
<li><a href="#the-what-and-the-why" id="markdown-toc-the-what-and-the-why">The what and the why</a></li>
<li><a href="#pipelines-in-flinkml" id="markdown-toc-pipelines-in-flinkml">Pipelines in FlinkML</a> <ul>
<li><a href="#scala-implicits" id="markdown-toc-scala-implicits">Scala implicits</a></li>
<li><a href="#operations" id="markdown-toc-operations">Operations</a></li>
<li><a href="#chaining" id="markdown-toc-chaining">Chaining</a></li>
<li><a href="#how-to-implement-a-pipeline-operator" id="markdown-toc-how-to-implement-a-pipeline-operator">How to Implement a Pipeline Operator</a></li>
</ul>
</li>
</ul>
<h2 id="introduction">Introduction</h2>
<p>The ability to chain together different transformers and predictors is an important feature for
any Machine Learning (ML) library. In FlinkML we wanted to provide an intuitive API,
and at the same
time utilize the capabilities of the Scala language to provide
type-safe implementations of our pipelines. What we hope to achieve then is an easy to use API,
that protects users from type errors at pre-flight (before the job is launched) time, thereby
eliminating cases where long
running jobs are submitted to the cluster only to see them fail due to some
error in the series of data transformations that commonly happen in an ML pipeline.</p>
<p>In this guide then we will describe the choices we made during the implementation of chainable
transformers and predictors in FlinkML, and provide guidelines on how developers can create their
own algorithms that make use of these capabilities.</p>
<h2 id="the-what-and-the-why">The what and the why</h2>
<p>So what do we mean by “ML pipelines”? Pipelines in the ML context can be thought of as chains of
operations that have some data as input, perform a number of transformations to that data,
and
then output the transformed data, either to be used as the input (features) of a predictor
function, such as a learning model, or just output the transformed data themselves, to be used in
some other task. The end learner can of course be a part of the pipeline as well.
ML pipelines can often be complicated sets of operations (<a href="http://research.google.com/pubs/pub43146.html">in-depth explanation</a>) and
can become sources of errors for end-to-end learning systems.</p>
<p>The purpose of ML pipelines is then to create a
framework that can be used to manage the complexity introduced by these chains of operations.
Pipelines should make it easy for developers to define chained transformations that can be
applied to the
training data, in order to create the end features that will be used to train a
learning model, and then perform the same set of transformations just as easily to unlabeled
(test) data. Pipelines should also simplify cross-validation and model selection on
these chains of operations.</p>
<p>Finally, by ensuring that the consecutive links in the pipeline chain “fit together” we also
avoid costly type errors. Since each step in a pipeline can be a computationally-heavy operation,
we want to avoid running a pipelined job, unless we are sure that all the input/output pairs in a
pipeline “fit”.</p>
<h2 id="pipelines-in-flinkml">Pipelines in FlinkML</h2>
<p>The building blocks for pipelines in FlinkML can be found in the <code>ml.pipeline</code> package.
FlinkML follows an API inspired by <a href="http://scikit-learn.org">sklearn</a> which means that we have
<code>Estimator</code>, <code>Transformer</code> and <code>Predictor</code> interfaces. For an in-depth look at the design of the
sklearn API the interested reader is referred to <a href="http://arxiv.org/abs/1309.0238">this</a> paper.
In short, the <code>Estimator</code> is the base class from which <code>Transformer</code> and <code>Predictor</code> inherit.
<code>Estimator</code> defines a <code>fit</code> method, and <code>Transformer</code> also defines a <code>transform</code> method and
<code>Predictor</code> defines a <code>predict</code> method.</p>
<p>The <code>fit</code> method of the <code>Estimator</code> performs the actual training of the model, for example
finding the correct weights in a linear regression task, or the mean and standard deviation of
the data in a feature scaler.
As evident by the naming, classes that implement
<code>Transformer</code> are transform operations like <a href="standard_scaler.html">scaling the input</a> and
<code>Predictor</code> implementations are learning algorithms such as <a href="http://flink.apache.org/docs/0.9/libs/ml/multiple_linear_regression.html">Multiple Linear Regression</a>.
Pipelines can be created by chaining together a number of Transformers, and the final link in a pipeline can be a Predictor or another Transformer.
Pipelines that end with Predictor cannot be chained any further.
Below is an example of how a pipeline can be formed:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Training data</span>
<span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">LabeledVector</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span>
<span class="c1">// Test data</span>
<span class="k">val</span> <span class="n">unlabeled</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Vector</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span>
<span class="k">val</span> <span class="n">scaler</span> <span class="k">=</span> <span class="nc">StandardScaler</span><span class="o">()</span>
<span class="k">val</span> <span class="n">polyFeatures</span> <span class="k">=</span> <span class="nc">PolynomialFeatures</span><span class="o">()</span>
<span class="k">val</span> <span class="n">mlr</span> <span class="k">=</span> <span class="nc">MultipleLinearRegression</span><span class="o">()</span>
<span class="c1">// Construct the pipeline</span>
<span class="k">val</span> <span class="n">pipeline</span> <span class="k">=</span> <span class="n">scaler</span>
<span class="o">.</span><span class="n">chainTransformer</span><span class="o">(</span><span class="n">polyFeatures</span><span class="o">)</span>
<span class="o">.</span><span class="n">chainPredictor</span><span class="o">(</span><span class="n">mlr</span><span class="o">)</span>
<span class="c1">// Train the pipeline (scaler and multiple linear regression)</span>
<span class="n">pipeline</span><span class="o">.</span><span class="n">fit</span><span class="o">(</span><span class="n">input</span><span class="o">)</span>
<span class="c1">// Calculate predictions for the testing data</span>
<span class="k">val</span> <span class="n">predictions</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">LabeledVector</span><span class="o">]</span> <span class="k">=</span> <span class="n">pipeline</span><span class="o">.</span><span class="n">predict</span><span class="o">(</span><span class="n">unlabeled</span><span class="o">)</span></code></pre></div>
<p>As we mentioned, FlinkML pipelines are type-safe.
If we tried to chain a transformer with output of type <code>A</code> to another with input of type <code>B</code> we
would get an error at pre-flight time if <code>A</code> != <code>B</code>. FlinkML achieves this kind of type-safety
through the use of Scala’s implicits.</p>
<h3 id="scala-implicits">Scala implicits</h3>
<p>If you are not familiar with Scala’s implicits we can recommend <a href="https://www.artima.com/pins1ed/implicit-conversions-and-parameters.html">this excerpt</a>
from Martin Odersky’s “Programming in Scala”. In short, implicit conversions allow for ad-hoc
polymorphism in Scala by providing conversions from one type to another, and implicit values
provide the compiler with default values that can be supplied to function calls through implicit parameters.
The combination of implicit conversions and implicit parameters is what allows us to chain transform
and predict operations together in a type-safe manner.</p>
<h3 id="operations">Operations</h3>
<p>As we mentioned, the trait (abstract class) <code>Estimator</code> defines a <code>fit</code> method. The method has two
parameter lists
(i.e. is a <a href="http://docs.scala-lang.org/tutorials/tour/currying.html">curried function</a>). The
first parameter list
takes the input (training) <code>DataSet</code> and the parameters for the estimator. The second parameter
list takes one <code>implicit</code> parameter, of type <code>FitOperation</code>. <code>FitOperation</code> is a class that also
defines a <code>fit</code> method, and this is where the actual logic of training the concrete Estimators
should be implemented. The <code>fit</code> method of <code>Estimator</code> is essentially a wrapper around the fit
method of <code>FitOperation</code>. The <code>predict</code> method of <code>Predictor</code> and the <code>transform</code> method of
<code>Transform</code> are designed in a similar manner, with a respective operation class.</p>
<p>In these methods the operation object is provided as an implicit parameter.
Scala will <a href="http://docs.scala-lang.org/tutorials/FAQ/finding-implicits.html">look for implicits</a>
in the companion object of a type, so classes that implement these interfaces should provide these
objects as implicit objects inside the companion object.</p>
<p>As an example we can look at the <code>StandardScaler</code> class. <code>StandardScaler</code> extends <code>Transformer</code>, so it has access to its <code>fit</code> and <code>transform</code> functions.
These two functions expect objects of <code>FitOperation</code> and <code>TransformOperation</code> as implicit parameters,
for the <code>fit</code> and <code>transform</code> methods respectively, which <code>StandardScaler</code> provides in its companion
object, through <code>transformVectors</code> and <code>fitVectorStandardScaler</code>:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">StandardScaler</span> <span class="k">extends</span> <span class="nc">Transformer</span><span class="o">[</span><span class="kt">StandardScaler</span><span class="o">]</span> <span class="o">{</span>
<span class="o">...</span>
<span class="o">}</span>
<span class="k">object</span> <span class="nc">StandardScaler</span> <span class="o">{</span>
<span class="o">...</span>
<span class="k">implicit</span> <span class="k">def</span> <span class="n">fitVectorStandardScaler</span><span class="o">[</span><span class="kt">T</span> <span class="k">&lt;:</span> <span class="kt">Vector</span><span class="o">]</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">FitOperation</span><span class="o">[</span><span class="kt">StandardScaler</span>, <span class="kt">T</span><span class="o">]</span> <span class="o">{</span>
<span class="k">override</span> <span class="k">def</span> <span class="n">fit</span><span class="o">(</span><span class="n">instance</span><span class="k">:</span> <span class="kt">StandardScaler</span><span class="o">,</span> <span class="n">fitParameters</span><span class="k">:</span> <span class="kt">ParameterMap</span><span class="o">,</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">T</span><span class="o">])</span>
<span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
<span class="o">...</span>
<span class="o">}</span>
<span class="k">implicit</span> <span class="k">def</span> <span class="n">transformVectors</span><span class="o">[</span><span class="kt">T</span> <span class="k">&lt;:</span> <span class="kt">Vector:</span> <span class="kt">VectorConverter:</span> <span class="kt">TypeInformation:</span> <span class="kt">ClassTag</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span>
<span class="k">new</span> <span class="nc">TransformOperation</span><span class="o">[</span><span class="kt">StandardScaler</span>, <span class="kt">T</span>, <span class="kt">T</span><span class="o">]</span> <span class="o">{</span>
<span class="k">override</span> <span class="k">def</span> <span class="n">transform</span><span class="o">(</span>
<span class="n">instance</span><span class="k">:</span> <span class="kt">StandardScaler</span><span class="o">,</span>
<span class="n">transformParameters</span><span class="k">:</span> <span class="kt">ParameterMap</span><span class="o">,</span>
<span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">T</span><span class="o">])</span>
<span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">T</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span>
<span class="o">...</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>Note that <code>StandardScaler</code> does <strong>not</strong> override the <code>fit</code> method of <code>Estimator</code> or the <code>transform</code>
method of <code>Transformer</code>. Rather, its implementations of <code>FitOperation</code> and <code>TransformOperation</code>
override their respective <code>fit</code> and <code>transform</code> methods, which are then called by the <code>fit</code> and
<code>transform</code> methods of <code>Estimator</code> and <code>Transformer</code>. Similarly, a class that implements
<code>Predictor</code> should define an implicit <code>PredictOperation</code> object inside its companion object.</p>
<h4 id="types-and-type-safety">Types and type safety</h4>
<p>Apart from the <code>fit</code> and <code>transform</code> operations that we listed above, the <code>StandardScaler</code> also
provides <code>fit</code> and <code>transform</code> operations for input of type <code>LabeledVector</code>.
This allows us to use the algorithm for input that is labeled or unlabeled, and this happens
automatically, depending on the type of the input that we give to the fit and transform
operations. The correct implicit operation is chosen by the compiler, depending on the input type.</p>
<p>If we try to call the <code>fit</code> or <code>transform</code> methods with types that are not supported we will get a
runtime error before the job is launched.
While it would be possible to catch these kinds of errors at compile time as well, the error
messages that we are able to provide the user would be much less informative, which is why we chose
to throw runtime exceptions instead.</p>
<h3 id="chaining">Chaining</h3>
<p>Chaining is achieved by calling <code>chainTransformer</code> or <code>chainPredictor</code> on an object
of a class that implements <code>Transformer</code>. These methods return a <code>ChainedTransformer</code> or
<code>ChainedPredictor</code> object respectively. As we mentioned, <code>ChainedTransformer</code> objects can be
chained further, while <code>ChainedPredictor</code> objects cannot. These classes take care of applying
fit, transform, and predict operations for a pair of successive transformers or
a transformer and a predictor. They also act recursively if the length of the
chain is larger than two, since every <code>ChainedTransformer</code> defines a <code>transform</code> and <code>fit</code>
operation that can be further chained with more transformers or a predictor.</p>
<p>It is important to note that developers and users do not need to worry about chaining when
implementing their algorithms, all this is handled automatically by FlinkML.</p>
<h3 id="how-to-implement-a-pipeline-operator">How to Implement a Pipeline Operator</h3>
<p>In order to support FlinkML’s pipelining, algorithms have to adhere to a certain design pattern, which we will describe in this section.
Let’s assume that we want to implement a pipeline operator which changes the mean of your data.
Since centering data is a common pre-processing step in many analysis pipelines, we will implement it as a <code>Transformer</code>.
Therefore, we first create a <code>MeanTransformer</code> class which inherits from <code>Transformer</code></p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">MeanTransformer</span> <span class="k">extends</span> <span class="nc">Transformer</span><span class="o">[</span><span class="kt">MeanTransformer</span><span class="o">]</span> <span class="o">{}</span></code></pre></div>
<p>Since we want to be able to configure the mean of the resulting data, we have to add a configuration parameter.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">MeanTransformer</span> <span class="k">extends</span> <span class="nc">Transformer</span><span class="o">[</span><span class="kt">MeanTransformer</span><span class="o">]</span> <span class="o">{</span>
<span class="k">def</span> <span class="n">setMean</span><span class="o">(</span><span class="n">mean</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span><span class="k">:</span> <span class="kt">this.</span><span class="k">type</span> <span class="o">=</span> <span class="o">{</span>
<span class="n">parameters</span><span class="o">.</span><span class="n">add</span><span class="o">(</span><span class="nc">MeanTransformer</span><span class="o">.</span><span class="nc">Mean</span><span class="o">,</span> <span class="n">mean</span><span class="o">)</span>
<span class="k">this</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="k">object</span> <span class="nc">MeanTransformer</span> <span class="o">{</span>
<span class="k">case</span> <span class="k">object</span> <span class="nc">Mean</span> <span class="k">extends</span> <span class="nc">Parameter</span><span class="o">[</span><span class="kt">Double</span><span class="o">]</span> <span class="o">{</span>
<span class="k">override</span> <span class="k">val</span> <span class="n">defaultValue</span><span class="k">:</span> <span class="kt">Option</span><span class="o">[</span><span class="kt">Double</span><span class="o">]</span> <span class="k">=</span> <span class="nc">Some</span><span class="o">(</span><span class="mf">0.0</span><span class="o">)</span>
<span class="o">}</span>
<span class="k">def</span> <span class="n">apply</span><span class="o">()</span><span class="k">:</span> <span class="kt">MeanTransformer</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">MeanTransformer</span>
<span class="o">}</span></code></pre></div>
<p>Parameters are defined in the companion object of the transformer class and extend the <code>Parameter</code> class.
Since the parameter instances are supposed to act as immutable keys for a parameter map, they should be implemented as <code>case objects</code>.
The default value will be used if no other value has been set by the user of this component.
If no default value has been specified, meaning that <code>defaultValue = None</code>, then the algorithm has to handle this situation accordingly.</p>
<p>We can now instantiate a <code>MeanTransformer</code> object and set the mean value of the transformed data.
But we still have to implement how the transformation works.
The workflow can be separated into two phases.
Within the first phase, the transformer learns the mean of the given training data.
This knowledge can then be used in the second phase to transform the provided data with respect to the configured resulting mean value.</p>
<p>The learning of the mean can be implemented within the <code>fit</code> operation of our <code>Transformer</code>, which it inherited from <code>Estimator</code>.
Within the <code>fit</code> operation, a pipeline component is trained with respect to the given training data.
The algorithm is, however, <strong>not</strong> implemented by overriding the <code>fit</code> method but by providing an implementation of a corresponding <code>FitOperation</code> for the correct type.
Taking a look at the definition of the <code>fit</code> method in <code>Estimator</code>, which is the parent class of <code>Transformer</code>, reveals what why this is the case.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">trait</span> <span class="nc">Estimator</span><span class="o">[</span><span class="kt">Self</span><span class="o">]</span> <span class="nc">extends</span> <span class="nc">WithParameters</span> <span class="k">with</span> <span class="nc">Serializable</span> <span class="o">{</span>
<span class="n">that</span><span class="k">:</span> <span class="kt">Self</span> <span class="o">=&gt;</span>
<span class="k">def</span> <span class="n">fit</span><span class="o">[</span><span class="kt">Training</span><span class="o">](</span>
<span class="n">training</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Training</span><span class="o">],</span>
<span class="n">fitParameters</span><span class="k">:</span> <span class="kt">ParameterMap</span> <span class="o">=</span> <span class="nc">ParameterMap</span><span class="o">.</span><span class="nc">Empty</span><span class="o">)</span>
<span class="o">(</span><span class="k">implicit</span> <span class="n">fitOperation</span><span class="k">:</span> <span class="kt">FitOperation</span><span class="o">[</span><span class="kt">Self</span>, <span class="kt">Training</span><span class="o">])</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
<span class="nc">FlinkMLTools</span><span class="o">.</span><span class="n">registerFlinkMLTypes</span><span class="o">(</span><span class="n">training</span><span class="o">.</span><span class="n">getExecutionEnvironment</span><span class="o">)</span>
<span class="n">fitOperation</span><span class="o">.</span><span class="n">fit</span><span class="o">(</span><span class="k">this</span><span class="o">,</span> <span class="n">fitParameters</span><span class="o">,</span> <span class="n">training</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>We see that the <code>fit</code> method is called with an input data set of type <code>Training</code>, an optional parameter list and in the second parameter list with an implicit parameter of type <code>FitOperation</code>.
Within the body of the function, first some machine learning types are registered and then the <code>fit</code> method of the <code>FitOperation</code> parameter is called.
The instance gives itself, the parameter map and the training data set as a parameters to the method.
Thus, all the program logic takes place within the <code>FitOperation</code>.</p>
<p>The <code>FitOperation</code> has two type parameters.
The first defines the pipeline operator type for which this <code>FitOperation</code> shall work and the second type parameter defines the type of the data set elements.
If we first wanted to implement the <code>MeanTransformer</code> to work on <code>DenseVector</code>, we would, thus, have to provide an implementation for <code>FitOperation[MeanTransformer, DenseVector]</code>.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">denseVectorMeanFitOperation</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">FitOperation</span><span class="o">[</span><span class="kt">MeanTransformer</span>, <span class="kt">DenseVector</span><span class="o">]</span> <span class="o">{</span>
<span class="k">override</span> <span class="k">def</span> <span class="n">fit</span><span class="o">(</span><span class="n">instance</span><span class="k">:</span> <span class="kt">MeanTransformer</span><span class="o">,</span> <span class="n">fitParameters</span><span class="k">:</span> <span class="kt">ParameterMap</span><span class="o">,</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">])</span> <span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
<span class="k">import</span> <span class="nn">org.apache.flink.ml.math.Breeze._</span>
<span class="k">val</span> <span class="n">meanTrainingData</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">]</span> <span class="k">=</span> <span class="n">input</span>
<span class="o">.</span><span class="n">map</span><span class="o">{</span> <span class="n">x</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">x</span><span class="o">.</span><span class="n">asBreeze</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> <span class="o">}</span>
<span class="o">.</span><span class="n">reduce</span><span class="o">{</span>
<span class="o">(</span><span class="n">left</span><span class="o">,</span> <span class="n">right</span><span class="o">)</span> <span class="k">=&gt;</span>
<span class="o">(</span><span class="n">left</span><span class="o">.</span><span class="n">_1</span> <span class="o">+</span> <span class="n">right</span><span class="o">.</span><span class="n">_1</span><span class="o">,</span> <span class="n">left</span><span class="o">.</span><span class="n">_2</span> <span class="o">+</span> <span class="n">right</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">.</span><span class="n">map</span><span class="o">{</span> <span class="n">p</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">p</span><span class="o">.</span><span class="n">_1</span><span class="o">/</span><span class="n">p</span><span class="o">.</span><span class="n">_2</span><span class="o">).</span><span class="n">fromBreeze</span> <span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>A <code>FitOperation[T, I]</code> has a <code>fit</code> method which is called with an instance of type <code>T</code>, a parameter map and an input <code>DataSet[I]</code>.
In our case <code>T=MeanTransformer</code> and <code>I=DenseVector</code>.
The parameter map is necessary if our fit step depends on some parameter values which were not given directly at creation time of the <code>Transformer</code>.
The <code>FitOperation</code> of the <code>MeanTransformer</code> sums the <code>DenseVector</code> instances of the given input data set up and divides the result by the total number of vectors.
That way, we obtain a <code>DataSet[DenseVector]</code> with a single element which is the mean value.</p>
<p>But if we look closely at the implementation, we see that the result of the mean computation is never stored anywhere.
If we want to use this knowledge in a later step to adjust the mean of some other input, we have to keep it around.
And here is where the parameter of type <code>MeanTransformer</code> which is given to the <code>fit</code> method comes into play.
We can use this instance to store state, which is used by a subsequent <code>transform</code> operation which works on the same object.
But first we have to extend <code>MeanTransformer</code> by a member field and then adjust the <code>FitOperation</code> implementation.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">MeanTransformer</span> <span class="k">extends</span> <span class="nc">Transformer</span><span class="o">[</span><span class="kt">Centering</span><span class="o">]</span> <span class="o">{</span>
<span class="k">var</span> <span class="n">meanOption</span><span class="k">:</span> <span class="kt">Option</span><span class="o">[</span><span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">]]</span> <span class="k">=</span> <span class="nc">None</span>
<span class="k">def</span> <span class="n">setMean</span><span class="o">(</span><span class="n">mean</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span><span class="k">:</span> <span class="kt">Mean</span> <span class="o">=</span> <span class="o">{</span>
<span class="n">parameters</span><span class="o">.</span><span class="n">add</span><span class="o">(</span><span class="nc">MeanTransformer</span><span class="o">.</span><span class="nc">Mean</span><span class="o">,</span> <span class="n">mu</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="k">val</span> <span class="n">denseVectorMeanFitOperation</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">FitOperation</span><span class="o">[</span><span class="kt">MeanTransformer</span>, <span class="kt">DenseVector</span><span class="o">]</span> <span class="o">{</span>
<span class="k">override</span> <span class="k">def</span> <span class="n">fit</span><span class="o">(</span><span class="n">instance</span><span class="k">:</span> <span class="kt">MeanTransformer</span><span class="o">,</span> <span class="n">fitParameters</span><span class="k">:</span> <span class="kt">ParameterMap</span><span class="o">,</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">])</span> <span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
<span class="k">import</span> <span class="nn">org.apache.flink.ml.math.Breeze._</span>
<span class="n">instance</span><span class="o">.</span><span class="n">meanOption</span> <span class="k">=</span> <span class="nc">Some</span><span class="o">(</span><span class="n">input</span>
<span class="o">.</span><span class="n">map</span><span class="o">{</span> <span class="n">x</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">x</span><span class="o">.</span><span class="n">asBreeze</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> <span class="o">}</span>
<span class="o">.</span><span class="n">reduce</span><span class="o">{</span>
<span class="o">(</span><span class="n">left</span><span class="o">,</span> <span class="n">right</span><span class="o">)</span> <span class="k">=&gt;</span>
<span class="o">(</span><span class="n">left</span><span class="o">.</span><span class="n">_1</span> <span class="o">+</span> <span class="n">right</span><span class="o">.</span><span class="n">_1</span><span class="o">,</span> <span class="n">left</span><span class="o">.</span><span class="n">_2</span> <span class="o">+</span> <span class="n">right</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">.</span><span class="n">map</span><span class="o">{</span> <span class="n">p</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">p</span><span class="o">.</span><span class="n">_1</span><span class="o">/</span><span class="n">p</span><span class="o">.</span><span class="n">_2</span><span class="o">).</span><span class="n">fromBreeze</span> <span class="o">})</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>If we look at the <code>transform</code> method in <code>Transformer</code>, we will see that we also need an implementation of <code>TransformOperation</code>.
A possible mean transforming implementation could look like the following.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">denseVectorMeanTransformOperation</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">TransformOperation</span><span class="o">[</span><span class="kt">MeanTransformer</span>, <span class="kt">DenseVector</span>, <span class="kt">DenseVector</span><span class="o">]</span> <span class="o">{</span>
<span class="k">override</span> <span class="k">def</span> <span class="n">transform</span><span class="o">(</span>
<span class="n">instance</span><span class="k">:</span> <span class="kt">MeanTransformer</span><span class="o">,</span>
<span class="n">transformParameters</span><span class="k">:</span> <span class="kt">ParameterMap</span><span class="o">,</span>
<span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">])</span>
<span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span>
<span class="k">val</span> <span class="n">resultingParameters</span> <span class="k">=</span> <span class="n">parameters</span> <span class="o">++</span> <span class="n">transformParameters</span>
<span class="k">val</span> <span class="n">resultingMean</span> <span class="k">=</span> <span class="n">resultingParameters</span><span class="o">(</span><span class="nc">MeanTransformer</span><span class="o">.</span><span class="nc">Mean</span><span class="o">)</span>
<span class="n">instance</span><span class="o">.</span><span class="n">meanOption</span> <span class="k">match</span> <span class="o">{</span>
<span class="k">case</span> <span class="nc">Some</span><span class="o">(</span><span class="n">trainingMean</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">{</span>
<span class="n">input</span><span class="o">.</span><span class="n">map</span><span class="o">{</span> <span class="k">new</span> <span class="nc">MeanTransformMapper</span><span class="o">(</span><span class="n">resultingMean</span><span class="o">)</span> <span class="o">}.</span><span class="n">withBroadcastSet</span><span class="o">(</span><span class="n">trainingMean</span><span class="o">,</span> <span class="s">&quot;trainingMean&quot;</span><span class="o">)</span>
<span class="o">}</span>
<span class="k">case</span> <span class="nc">None</span> <span class="k">=&gt;</span> <span class="k">throw</span> <span class="k">new</span> <span class="nc">RuntimeException</span><span class="o">(</span><span class="s">&quot;MeanTransformer has not been fitted to data.&quot;</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="k">class</span> <span class="nc">MeanTransformMapper</span><span class="o">(</span><span class="n">resultingMean</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">RichMapFunction</span><span class="o">[</span><span class="kt">DenseVector</span>, <span class="kt">DenseVector</span><span class="o">]</span> <span class="o">{</span>
<span class="k">var</span> <span class="n">trainingMean</span><span class="k">:</span> <span class="kt">DenseVector</span> <span class="o">=</span> <span class="kc">null</span>
<span class="k">override</span> <span class="k">def</span> <span class="n">open</span><span class="o">(</span><span class="n">parameters</span><span class="k">:</span> <span class="kt">Configuration</span><span class="o">)</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
<span class="n">trainingMean</span> <span class="k">=</span> <span class="n">getRuntimeContext</span><span class="o">().</span><span class="n">getBroadcastVariable</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">](</span><span class="s">&quot;trainingMean&quot;</span><span class="o">).</span><span class="n">get</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">}</span>
<span class="k">override</span> <span class="k">def</span> <span class="n">map</span><span class="o">(</span><span class="n">vector</span><span class="k">:</span> <span class="kt">DenseVector</span><span class="o">)</span><span class="k">:</span> <span class="kt">DenseVector</span> <span class="o">=</span> <span class="o">{</span>
<span class="k">import</span> <span class="nn">org.apache.flink.ml.math.Breeze._</span>
<span class="k">val</span> <span class="n">result</span> <span class="k">=</span> <span class="n">vector</span><span class="o">.</span><span class="n">asBreeze</span> <span class="o">-</span> <span class="n">trainingMean</span><span class="o">.</span><span class="n">asBreeze</span> <span class="o">+</span> <span class="n">resultingMean</span>
<span class="n">result</span><span class="o">.</span><span class="n">fromBreeze</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>Now we have everything implemented to fit our <code>MeanTransformer</code> to a training data set of <code>DenseVector</code> instances and to transform them.
However, when we execute the <code>fit</code> operation</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">trainingData</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span>
<span class="k">val</span> <span class="n">meanTransformer</span> <span class="k">=</span> <span class="nc">MeanTransformer</span><span class="o">()</span>
<span class="n">meanTransformer</span><span class="o">.</span><span class="n">fit</span><span class="o">(</span><span class="n">trainingData</span><span class="o">)</span></code></pre></div>
<p>we receive the following error at runtime: <code>"There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.math.DenseVector]"</code>.
The reason is that the Scala compiler could not find a fitting <code>FitOperation</code> value with the right type parameters for the implicit parameter of the <code>fit</code> method.
Therefore, it chose a fallback implicit value which gives you this error message at runtime.
In order to make the compiler aware of our implementation, we have to define it as an implicit value and put it in the scope of the <code>MeanTransformer's</code> companion object.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">object</span> <span class="nc">MeanTransformer</span><span class="o">{</span>
<span class="k">implicit</span> <span class="k">val</span> <span class="n">denseVectorMeanFitOperation</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">FitOperation</span><span class="o">[</span><span class="kt">MeanTransformer</span>, <span class="kt">DenseVector</span><span class="o">]</span> <span class="o">...</span>
<span class="k">implicit</span> <span class="k">val</span> <span class="n">denseVectorMeanTransformOperation</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">TransformOperation</span><span class="o">[</span><span class="kt">MeanTransformer</span>, <span class="kt">DenseVector</span>, <span class="kt">DenseVector</span><span class="o">]</span> <span class="o">...</span>
<span class="o">}</span></code></pre></div>
<p>Now we can call <code>fit</code> and <code>transform</code> of our <code>MeanTransformer</code> with <code>DataSet[DenseVector]</code> as input.
Furthermore, we can now use this transformer as part of an analysis pipeline where we have a <code>DenseVector</code> as input and expected output.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">trainingData</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">DenseVector</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span>
<span class="k">val</span> <span class="n">mean</span> <span class="k">=</span> <span class="nc">MeanTransformer</span><span class="o">.</span><span class="n">setMean</span><span class="o">(</span><span class="mf">1.0</span><span class="o">)</span>
<span class="k">val</span> <span class="n">polyFeaturs</span> <span class="k">=</span> <span class="nc">PolynomialFeatures</span><span class="o">().</span><span class="n">setDegree</span><span class="o">(</span><span class="mi">3</span><span class="o">)</span>
<span class="k">val</span> <span class="n">pipeline</span> <span class="k">=</span> <span class="n">mean</span><span class="o">.</span><span class="n">chainTransformer</span><span class="o">(</span><span class="n">polyFeatures</span><span class="o">)</span>
<span class="n">pipeline</span><span class="o">.</span><span class="n">fit</span><span class="o">(</span><span class="n">trainingData</span><span class="o">)</span></code></pre></div>
<p>It is noteworthy that there is no additional code needed to enable chaining.
The system automatically constructs the pipeline logic using the operations of the individual components.</p>
<p>So far everything works fine with <code>DenseVector</code>.
But what happens, if we call our transformer with <code>LabeledVector</code> instead?</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">trainingData</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">LabeledVector</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span>
<span class="k">val</span> <span class="n">mean</span> <span class="k">=</span> <span class="nc">MeanTransformer</span><span class="o">()</span>
<span class="n">mean</span><span class="o">.</span><span class="n">fit</span><span class="o">(</span><span class="n">trainingData</span><span class="o">)</span></code></pre></div>
<p>As before we see the following exception upon execution of the program: <code>"There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.common.LabeledVector]"</code>.
It is noteworthy, that this exception is thrown in the pre-flight phase, which means that the job has not been submitted to the runtime system.
This has the advantage that you won’t see a job which runs for a couple of days and then fails because of an incompatible pipeline component.
Type compatibility is, thus, checked at the very beginning for the complete job.</p>
<p>In order to make the <code>MeanTransformer</code> work on <code>LabeledVector</code> as well, we have to provide the corresponding operations.
Consequently, we have to define a <code>FitOperation[MeanTransformer, LabeledVector]</code> and <code>TransformOperation[MeanTransformer, LabeledVector, LabeledVector]</code> as implicit values in the scope of <code>MeanTransformer</code>’s companion object.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">object</span> <span class="nc">MeanTransformer</span> <span class="o">{</span>
<span class="k">implicit</span> <span class="k">val</span> <span class="n">labeledVectorFitOperation</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">FitOperation</span><span class="o">[</span><span class="kt">MeanTransformer</span>, <span class="kt">LabeledVector</span><span class="o">]</span> <span class="o">...</span>
<span class="k">implicit</span> <span class="k">val</span> <span class="n">labeledVectorTransformOperation</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">TransformOperation</span><span class="o">[</span><span class="kt">MeanTransformer</span>, <span class="kt">LabeledVector</span>, <span class="kt">LabeledVector</span><span class="o">]</span> <span class="o">...</span>
<span class="o">}</span></code></pre></div>
<p>If we wanted to implement a <code>Predictor</code> instead of a <code>Transformer</code>, then we would have to provide a <code>FitOperation</code>, too.
Moreover, a <code>Predictor</code> requires a <code>PredictOperation</code> which implements how predictions are calculated from testing data.</p>
</div>
<div class="col-sm-10 col-sm-offset-1">
<!-- Disqus thread and some vertical offset -->
<div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div>
</div>
</div>
</div><!-- /.container -->
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="http://flink.apache.org/docs/0.9/page/js/codetabs.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
<!-- Disqus -->
<script type="text/javascript">
var disqus_shortname = 'stratosphere-eu';
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
</body>
</html>