blob: c677962221022fb98a6b37b84ee51de5dd18915e [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink 0.9.0 Documentation: Best Practices</title>
<link rel="shortcut icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon">
<link rel="icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/flink.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/syntax.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/codetabs.css">
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Top navbar. -->
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/0.9/page/img/navbar-brand-logo.jpg"></a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<li><a href="http://flink.apache.org/docs/0.9/index.html">Overview<span class="hidden-sm hidden-xs"> 0.9.0</span></a></li>
<!-- Setup -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/0.9/setup/building.html">Get Flink 0.9-SNAPSHOT</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Deployment</strong></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/local_setup.html" class="active">Local</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/cluster_setup.html">Cluster (Standalone)</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/yarn_setup.html">YARN</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/gce_setup.html">GCloud</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/config.html">Configuration</a></li>
</ul>
</li>
<!-- Programming Guides -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/0.9/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/python.html">Python API <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="scala_shell.html">Interactive Scala Shell</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/dataset_transformations.html">Dataset Transformations</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/best_practices.html">Best Practices</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/example_connectors.html">Connectors</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/examples.html">Examples</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/local_execution.html">Local Execution</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/cluster_execution.html">Cluster Execution</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/cli.html">Command Line Interface</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/web_client.html">Web Client</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/iterations.html">Iterations</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/java8.html">Java 8</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Libraries -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/0.9/libs/spargel_guide.html">Graphs: Spargel</a></li>
<li><a href="http://flink.apache.org/docs/0.9/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/0.9/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/0.9/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Internals -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/how_to_contribute.html">How to Contribute</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/coding_guidelines.html">Coding Guidelines</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/ide_setup.html">IDE Setup</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/logging.html">Logging</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Internals</strong></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/general_arch.html">Architecture &amp; Process Model</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/types_serialization.html">Type Extraction &amp; Serialization</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/job_scheduling.html">Jobs &amp; Scheduling</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/add_operator.html">How-To: Add an Operator</a></li>
</ul>
</li>
</ul>
<form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/0.9/search-results.html">
<div class="form-group">
<input type="text" class="form-control" name="q" placeholder="Search all pages">
</div>
<button type="submit" class="btn btn-default">Search</button>
</form>
</div><!-- /.navbar-collapse -->
</div><!-- /.container -->
</nav>
<!-- Main content. -->
<div class="container">
<div class="row">
<div class="col-sm-10 col-sm-offset-1">
<h1>Best Practices</h1>
<p><a href="#top"></a></p>
<p>This page contains a collection of best practices for Flink programmers on how to solve frequently encountered problems.</p>
<ul id="markdown-toc">
<li><a href="#parsing-command-line-arguments-and-passing-them-around-in-your-flink-application" id="markdown-toc-parsing-command-line-arguments-and-passing-them-around-in-your-flink-application">Parsing command line arguments and passing them around in your Flink application</a> <ul>
<li><a href="#getting-your-configuration-values-into-the-parametertool" id="markdown-toc-getting-your-configuration-values-into-the-parametertool">Getting your configuration values into the <code>ParameterTool</code></a></li>
<li><a href="#using-the-parameters-in-your-flink-program" id="markdown-toc-using-the-parameters-in-your-flink-program">Using the parameters in your Flink program</a></li>
</ul>
</li>
<li><a href="#naming-large-tuplex-types" id="markdown-toc-naming-large-tuplex-types">Naming large TupleX types</a></li>
<li><a href="#register-a-custom-serializer-for-your-flink-program" id="markdown-toc-register-a-custom-serializer-for-your-flink-program">Register a custom serializer for your Flink program</a></li>
</ul>
<h2 id="parsing-command-line-arguments-and-passing-them-around-in-your-flink-application">Parsing command line arguments and passing them around in your Flink application</h2>
<p>Almost all Flink applications, both batch and streaming rely on external configuration parameters.
For example for specifying input and output sources (like paths or addresses), also system parameters (parallelism, runtime configuration) and application specific parameters (often used within the user functions).</p>
<p>Since version 0.9 we are providing a simple utility called <code>ParameterTool</code> to provide at least some basic tooling for solving these problems.</p>
<p>Please note that you don’t have to use the <code>ParameterTool</code> explained here. Other frameworks such as <a href="https://commons.apache.org/proper/commons-cli/">Commons CLI</a>,
<a href="http://argparse4j.sourceforge.net/">argparse4j</a> and others work well with Flink as well.</p>
<h3 id="getting-your-configuration-values-into-the-parametertool">Getting your configuration values into the <code>ParameterTool</code></h3>
<p>The <code>ParameterTool</code> provides a set of predefined static methods for reading the configuration. The tool is internally expecting a <code>Map&lt;String, String&gt;</code>, so its very easy to integrate it with your own configuration style.</p>
<h4 id="from-properties-files">From <code>.properties</code> files</h4>
<p>The following method will read a <a href="https://docs.oracle.com/javase/tutorial/essential/environment/properties.html">Properties</a> file and provide the key/value pairs:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">String</span> <span class="n">propertiesFile</span> <span class="o">=</span> <span class="s">&quot;/home/sam/flink/myjob.properties&quot;</span><span class="o">;</span>
<span class="n">ParameterTool</span> <span class="n">parameter</span> <span class="o">=</span> <span class="n">ParameterTool</span><span class="o">.</span><span class="na">fromPropertiesFile</span><span class="o">(</span><span class="n">propertiesFile</span><span class="o">);</span></code></pre></div>
<h4 id="from-the-command-line-arguments">From the command line arguments</h4>
<p>This allows getting arguments like <code>--input hdfs:///mydata --elements 42</code> from the command line.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="o">{</span>
<span class="n">ParameterTool</span> <span class="n">parameter</span> <span class="o">=</span> <span class="n">ParameterTool</span><span class="o">.</span><span class="na">fromArgs</span><span class="o">(</span><span class="n">args</span><span class="o">);</span>
<span class="c1">// .. regular code ..</span></code></pre></div>
<h4 id="from-system-properties">From system properties</h4>
<p>When starting a JVM, you can pass system properties to it: <code>-Dinput=hdfs:///mydata</code>. You can also initialize the <code>ParameterTool</code> from these system properties:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ParameterTool</span> <span class="n">parameter</span> <span class="o">=</span> <span class="n">ParameterTool</span><span class="o">.</span><span class="na">fromSystemProperties</span><span class="o">();</span></code></pre></div>
<h3 id="using-the-parameters-in-your-flink-program">Using the parameters in your Flink program</h3>
<p>Now that we’ve got the parameters from somewhere (see above) we can use them in various ways.</p>
<p><strong>Directly from the <code>ParameterTool</code></strong></p>
<p>The <code>ParameterTool</code> itself has methods for accessing the values.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ParameterTool</span> <span class="n">parameters</span> <span class="o">=</span> <span class="c1">// ...</span>
<span class="n">parameter</span><span class="o">.</span><span class="na">getRequired</span><span class="o">(</span><span class="s">&quot;input&quot;</span><span class="o">);</span>
<span class="n">parameter</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;output&quot;</span><span class="o">,</span> <span class="s">&quot;myDefaultValue&quot;</span><span class="o">);</span>
<span class="n">parameter</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="s">&quot;expectedCount&quot;</span><span class="o">,</span> <span class="o">-</span><span class="mi">1L</span><span class="o">);</span>
<span class="n">parameter</span><span class="o">.</span><span class="na">getNumberOfParameters</span><span class="o">()</span>
<span class="c1">// .. there are more methods available.</span></code></pre></div>
<p>You can use the return values of these methods directly in the main() method (=the client submitting the application).
For example you could set the parallelism of a operator like this:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ParameterTool</span> <span class="n">parameters</span> <span class="o">=</span> <span class="n">ParameterTool</span><span class="o">.</span><span class="na">fromArgs</span><span class="o">(</span><span class="n">args</span><span class="o">);</span>
<span class="kt">int</span> <span class="n">parallelism</span> <span class="o">=</span> <span class="n">parameters</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;mapParallelism&quot;</span><span class="o">,</span> <span class="mi">2</span><span class="o">);</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">counts</span> <span class="o">=</span> <span class="n">text</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">Tokenizer</span><span class="o">()).</span><span class="na">setParallelism</span><span class="o">(</span><span class="n">parallelism</span><span class="o">);</span></code></pre></div>
<p>Since the <code>ParameterTool</code> is serializable, you can pass it to the functions itself:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ParameterTool</span> <span class="n">parameters</span> <span class="o">=</span> <span class="n">ParameterTool</span><span class="o">.</span><span class="na">fromArgs</span><span class="o">(</span><span class="n">args</span><span class="o">);</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">counts</span> <span class="o">=</span> <span class="n">text</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">Tokenizer</span><span class="o">(</span><span class="n">parameters</span><span class="o">));</span></code></pre></div>
<p>and then use them inside the function for getting values from the command line.</p>
<h4 id="passing-it-as-a-configuration-object-to-single-functions">Passing it as a <code>Configuration</code> object to single functions</h4>
<p>The example below shows how to pass the parameters as a <code>Configuration</code> object to a user defined function.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ParameterTool</span> <span class="n">parameters</span> <span class="o">=</span> <span class="n">ParameterTool</span><span class="o">.</span><span class="na">fromArgs</span><span class="o">(</span><span class="n">args</span><span class="o">);</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">counts</span> <span class="o">=</span> <span class="n">text</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">Tokenizer</span><span class="o">()).</span><span class="na">withParameters</span><span class="o">(</span><span class="n">parameters</span><span class="o">.</span><span class="na">getConfiguration</span><span class="o">())</span></code></pre></div>
<p>In the <code>Tokenizer</code>, the object is now accessible in the <code>open(Configuration conf)</code> method:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">Tokenizer</span> <span class="kd">extends</span> <span class="n">RichFlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">open</span><span class="o">(</span><span class="n">Configuration</span> <span class="n">parameters</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">parameters</span><span class="o">.</span><span class="na">getInteger</span><span class="o">(</span><span class="s">&quot;myInt&quot;</span><span class="o">,</span> <span class="o">-</span><span class="mi">1</span><span class="o">);</span>
<span class="c1">// .. do</span></code></pre></div>
<h4 id="register-the-parameters-globally">Register the parameters globally</h4>
<p>Parameters registered as a <a href="programming_guide.html#passing-parameters-to-functions">global job parameter</a> at the <code>ExecutionConfig</code> allow you to access the configuration values from the JobManager web interface and all functions defined by the user.</p>
<p><strong>Register the parameters globally</strong></p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ParameterTool</span> <span class="n">parameters</span> <span class="o">=</span> <span class="n">ParameterTool</span><span class="o">.</span><span class="na">fromArgs</span><span class="o">(</span><span class="n">args</span><span class="o">);</span>
<span class="c1">// set up the execution environment</span>
<span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">getConfig</span><span class="o">().</span><span class="na">setGlobalJobParameters</span><span class="o">(</span><span class="n">parameters</span><span class="o">);</span></code></pre></div>
<p>Access them in any rich user function:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">Tokenizer</span> <span class="kd">extends</span> <span class="n">RichFlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
<span class="n">ParameterTool</span> <span class="n">parameters</span> <span class="o">=</span> <span class="o">(</span><span class="n">ParameterTool</span><span class="o">)</span> <span class="n">getRuntimeContext</span><span class="o">().</span><span class="na">getExecutionConfig</span><span class="o">().</span><span class="na">getGlobalJobParameters</span><span class="o">();</span>
<span class="n">parameters</span><span class="o">.</span><span class="na">getRequired</span><span class="o">(</span><span class="s">&quot;input&quot;</span><span class="o">);</span>
<span class="c1">// .. do more ..</span></code></pre></div>
<h2 id="naming-large-tuplex-types">Naming large TupleX types</h2>
<p>It is recommended to use POJOs (Plain old Java objects) instead of <code>TupleX</code> for data types with many fields.
Also, POJOs can be used to give large <code>Tuple</code>-types a name.</p>
<p><strong>Example</strong></p>
<p>Instead of using:</p>
<div class="highlight"><pre><code class="language-java"><span class="n">Tuple11</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="o">...,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">var</span> <span class="o">=</span> <span class="k">new</span> <span class="o">...;</span></code></pre></div>
<p>It is much easier to create a custom type extending from the large Tuple type.</p>
<div class="highlight"><pre><code class="language-java"><span class="n">CustomType</span> <span class="n">var</span> <span class="o">=</span> <span class="k">new</span> <span class="o">...;</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">CustomType</span> <span class="kd">extends</span> <span class="n">Tuple11</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="o">...,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="c1">// constructor matching super</span>
<span class="o">}</span></code></pre></div>
<h2 id="register-a-custom-serializer-for-your-flink-program">Register a custom serializer for your Flink program</h2>
<p>If you use a custom type in your Flink program which cannot be serialized by the
Flink type serializer, Flink falls back to using the generic Kryo
serializer. You may register your own serializer or a serialization system like
Google Protobuf or Apache Thrift with Kryo. To do that, simply register the type
class and the serializer in the <code>ExecutionConfig</code> of your Flink program.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="c1">// register the class of the serializer as serializer for a type</span>
<span class="n">env</span><span class="o">.</span><span class="na">getConfig</span><span class="o">().</span><span class="na">registerTypeWithKryoSerializer</span><span class="o">(</span><span class="n">MyCustomType</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">MyCustomSerializer</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="c1">// register an instance as serializer for a type</span>
<span class="n">MySerializer</span> <span class="n">mySerializer</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">MySerializer</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">getConfig</span><span class="o">().</span><span class="na">registerTypeWithKryoSerializer</span><span class="o">(</span><span class="n">MyCustomType</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">mySerializer</span><span class="o">);</span></code></pre></div>
<p>Note that your custom serializer has to extend Kryo’s Serializer class. In the
case of Google Protobuf or Apache Thrift, this has already been done for
you:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="c1">// register the Google Protobuf serializer with Kryo</span>
<span class="n">env</span><span class="o">.</span><span class="na">getConfig</span><span class="o">().</span><span class="na">registerTypeWithKryoSerializer</span><span class="o">(</span><span class="n">MyCustomType</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">ProtobufSerializer</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="c1">// register the serializer included with Apache Thrift as the standard serializer</span>
<span class="c1">// TBaseSerializer states it should be initalized as a default Kryo serializer</span>
<span class="n">env</span><span class="o">.</span><span class="na">getConfig</span><span class="o">().</span><span class="na">addDefaultKryoSerializer</span><span class="o">(</span><span class="n">MyCustomType</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">TBaseSerializer</span><span class="o">.</span><span class="na">class</span><span class="o">);</span></code></pre></div>
<p>For the above example to work, you need to include the necessary dependencies in
your Maven project file (pom.xml). In the dependency section, add the following
for Apache Thrift:</p>
<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>com.twitter<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>chill-thrift<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.5.2<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
<span class="c">&lt;!-- libthrift is required by chill-thrift --&gt;</span>
<span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.thrift<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>libthrift<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.6.1<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;exclusions&gt;</span>
<span class="nt">&lt;exclusion&gt;</span>
<span class="nt">&lt;groupId&gt;</span>javax.servlet<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>servlet-api<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;/exclusion&gt;</span>
<span class="nt">&lt;exclusion&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.httpcomponents<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>httpclient<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;/exclusion&gt;</span>
<span class="nt">&lt;/exclusions&gt;</span>
<span class="nt">&lt;/dependency&gt;</span></code></pre></div>
<p>For Google Protobuf you need the following Maven dependency:</p>
<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>com.twitter<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>chill-protobuf<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.5.2<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
<span class="c">&lt;!-- We need protobuf for chill-protobuf --&gt;</span>
<span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>com.google.protobuf<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>protobuf-java<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>2.5.0<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span></code></pre></div>
<p>Please adjust the versions of both libraries as needed.</p>
</div>
<div class="col-sm-10 col-sm-offset-1">
<!-- Disqus thread and some vertical offset -->
<div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div>
</div>
</div>
</div><!-- /.container -->
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="http://flink.apache.org/docs/0.9/page/js/codetabs.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
<!-- Disqus -->
<script type="text/javascript">
var disqus_shortname = 'stratosphere-eu';
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
</body>
</html>