blob: a31be1ac5a604c7a3b4632f06c58faac2c9ff414 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink 0.10-SNAPSHOT Documentation: Connecting to other systems</title>
<link rel="shortcut icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon">
<link rel="icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/flink.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/syntax.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/codetabs.css">
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Top navbar. -->
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/master/page/img/navbar-brand-logo.jpg"></a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<li><a href="http://flink.apache.org/docs/master/index.html">Overview<span class="hidden-sm hidden-xs"> 0.10</span></a></li>
<!-- Setup -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/setup/building.html">Get Flink 0.10-SNAPSHOT</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Deployment</strong></li>
<li><a href="http://flink.apache.org/docs/master/setup/local_setup.html" class="active">Local</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/cluster_setup.html">Cluster (Standalone)</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/yarn_setup.html">YARN</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/gce_setup.html">GCloud</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="http://flink.apache.org/docs/master/setup/config.html">Configuration</a></li>
</ul>
</li>
<!-- Programming Guides -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li>
<li><a href="http://flink.apache.org/docs/master/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/apis/python.html">Python API <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="scala_shell.html">Interactive Scala Shell</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/dataset_transformations.html">Dataset Transformations</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/best_practices.html">Best Practices</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/example_connectors.html">Connectors</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/examples.html">Examples</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/local_execution.html">Local Execution</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/cluster_execution.html">Cluster Execution</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/cli.html">Command Line Interface</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/web_client.html">Web Client</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/iterations.html">Iterations</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/java8.html">Java 8</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Libraries -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/libs/spargel_guide.html">Graphs: Spargel</a></li>
<li><a href="http://flink.apache.org/docs/master/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Internals -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
<li><a href="http://flink.apache.org/docs/master/internals/how_to_contribute.html">How to Contribute</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/coding_guidelines.html">Coding Guidelines</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/ide_setup.html">IDE Setup</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/logging.html">Logging</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Internals</strong></li>
<li><a href="http://flink.apache.org/docs/master/internals/general_arch.html">Architecture &amp; Process Model</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/types_serialization.html">Type Extraction &amp; Serialization</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/job_scheduling.html">Jobs &amp; Scheduling</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/add_operator.html">How-To: Add an Operator</a></li>
</ul>
</li>
</ul>
<form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/master/search-results.html">
<div class="form-group">
<input type="text" class="form-control" name="q" placeholder="Search all pages">
</div>
<button type="submit" class="btn btn-default">Search</button>
</form>
</div><!-- /.navbar-collapse -->
</div><!-- /.container -->
</nav>
<!-- Main content. -->
<div class="container">
<div class="row">
<div class="col-sm-10 col-sm-offset-1">
<h1>Connecting to other systems</h1>
<h2 id="reading-from-filesystems">Reading from filesystems.</h2>
<p>Flink has build-in support for the following file systems:</p>
<table>
<thead>
<tr>
<th>Filesystem</th>
<th>Since</th>
<th>Scheme</th>
<th>Notes</th>
</tr>
</thead>
<tbody>
<tr>
<td>Hadoop Distributed File System (HDFS)</td>
<td>0.2</td>
<td><code>hdfs://</code></td>
<td>All HDFS versions are supported</td>
</tr>
<tr>
<td>Amazon S3</td>
<td>0.2</td>
<td><code>s3://</code></td>
<td>Ā </td>
</tr>
<tr>
<td>MapR file system</td>
<td>0.7-incubating</td>
<td><code>maprfs://</code></td>
<td>The user has to manually place the required jar files in the <code>lib/</code> dir</td>
</tr>
<tr>
<td>Tachyon</td>
<td>0.9</td>
<td><code>tachyon://</code></td>
<td>Support through Hadoop file system implementation (see below)</td>
</tr>
</tbody>
</table>
<h3 id="using-hadoop-file-systems-with-apache-flink">Using Hadoop file systems with Apache Flink</h3>
<p>Apache Flink allows users to use any file system implementing the <code>org.apache.hadoop.fs.FileSystem</code>
interface. Hadoop ships adapters for FTP, <a href="http://hadoop.apache.org/docs/r1.2.1/hftp.html">Hftp</a>, and others.</p>
<p>Flink has integrated testcases to validate the integration with <a href="http://tachyon-project.org/">Tachyon</a>.
Other file systems we tested the integration is the
<a href="https://cloud.google.com/hadoop/google-cloud-storage-connector">Google Cloud Storage Connector for Hadoop</a> and <a href="http://www.xtreemfs.org/">XtreemFS</a>.</p>
<p>In order to use a Hadoop file system with Flink, make sure that the <code>flink-conf.yaml</code> has set the
<code>fs.hdfs.hadoopconf</code> property set to the Hadoop configuration directory.
In addition to that, the Hadoop configuration (in that directory) needs to have an entry for each supported file system.
For example for tachyon support, there must be the following entry in the <code>core-site.xml</code> file:</p>
<div class="highlight"><pre><code class="language-xml"><span class="nt">&lt;property&gt;</span>
<span class="nt">&lt;name&gt;</span>fs.tachyon.impl<span class="nt">&lt;/name&gt;</span>
<span class="nt">&lt;value&gt;</span>tachyon.hadoop.TFS<span class="nt">&lt;/value&gt;</span>
<span class="nt">&lt;/property&gt;</span></code></pre></div>
<p>Also, the required classes for using the file system need to be placed in the <code>lib/</code> folder of the Flink installation (on all machines running Flink). If putting the files into the directory is not possible, Flink is also respecting the <code>HADOOP_CLASSPATH</code> environment variable to add Hadoop jar files to the classpath.</p>
<h2 id="connecting-to-other-systems-using-input--output-format-wrappers-for-hadoop">Connecting to other systems using Input / Output Format wrappers for Hadoop</h2>
<p>Apache Flink allows users to access many different systems as data sources or sinks.
The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept
of so called <code>InputFormat</code>s and <code>OutputFormat</code>s.</p>
<p>One implementation of these <code>InputFormat</code>s is the <code>HadoopInputFormat</code>. This is a wrapper that allows
users to use all existing Hadoop input formats with Flink.</p>
<p>This section shows some examples for connecting Flink to other systems.
<a href="hadoop_compatibility.html">Read more about Hadoop compatibility in Flink</a>.</p>
<h2 id="avro-support-in-flink">Avro support in Flink</h2>
<p>Flink has extensive build-in support for <a href="http://avro.apache.org/">Apache Avro</a>. This allows to easily read from Avro files with Flink.
Also, the serialization framework of Flink is able to handle classes generated from Avro schemas.</p>
<p>In order to read data from an Avro file, you have to specify an <code>AvroInputFormat</code>.</p>
<p><strong>Example</strong>:</p>
<div class="highlight"><pre><code class="language-java"><span class="n">AvroInputFormat</span><span class="o">&lt;</span><span class="n">User</span><span class="o">&gt;</span> <span class="n">users</span> <span class="o">=</span> <span class="k">new</span> <span class="n">AvroInputFormat</span><span class="o">&lt;</span><span class="n">User</span><span class="o">&gt;(</span><span class="n">in</span><span class="o">,</span> <span class="n">User</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">User</span><span class="o">&gt;</span> <span class="n">usersDS</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">createInput</span><span class="o">(</span><span class="n">users</span><span class="o">);</span></code></pre></div>
<p>Note that <code>User</code> is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:</p>
<div class="highlight"><pre><code class="language-java"><span class="n">usersDS</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;name&quot;</span><span class="o">)</span></code></pre></div>
<p>Note that using the <code>GenericData.Record</code> type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.</p>
<p>Flinkā€™s POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type <code>Object</code> you can not use the field as a join or grouping key.
Specifying a field in Avro like this <code>{"name": "type_double_test", "type": "double"},</code> works fine, however specifying it as a UNION-type with only one field (<code>{"name": "type_double_test", "type": ["double"]},</code>) will generate a field of type <code>Object</code>. Note that specifying nullable types (<code>{"name": "type_double_test", "type": ["null", "double"]},</code>) is possible!</p>
<h3 id="access-microsoft-azure-table-storage">Access Microsoft Azure Table Storage</h3>
<p><em>Note: This example works starting from Flink 0.6-incubating</em></p>
<p>This example is using the <code>HadoopInputFormat</code> wrapper to use an existing Hadoop input format implementation for accessing <a href="https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/">Azureā€™s Table Storage</a>.</p>
<ol>
<li>
<p>Download and compile the <code>azure-tables-hadoop</code> project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves.
Execute the following commands:</p>
<div class="highlight"><pre><code class="language-bash">git clone https://github.com/mooso/azure-tables-hadoop.git
<span class="nb">cd </span>azure-tables-hadoop
mvn clean install</code></pre></div>
</li>
<li>
<p>Setup a new Flink project using the quickstarts:</p>
<div class="highlight"><pre><code class="language-bash">curl http://flink.apache.org/q/quickstart.sh <span class="p">|</span> bash</code></pre></div>
</li>
<li>
<p>Add the following dependencies (in the <code>&lt;dependencies&gt;</code> section) to your <code>pom.xml</code> file:</p>
<div class="highlight"><pre><code class="language-xml"><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-hadoop-compatibility<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.10-SNAPSHOT<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
<span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>com.microsoft.hadoop<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>microsoft-hadoop-azure<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.0.4<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span></code></pre></div>
<p><code>flink-hadoop-compatibility</code> is a Flink package that provides the Hadoop input format wrappers.
<code>microsoft-hadoop-azure</code> is adding the project weā€™ve build before to our project.</p>
</li>
</ol>
<p>The project is now prepared for starting to code. We recommend to import the project into an IDE, such as Eclipse or IntelliJ. (Import as a Maven project!).
Browse to the code of the <code>Job.java</code> file. Its an empty skeleton for a Flink job.</p>
<p>Paste the following code into it:</p>
<div class="highlight"><pre><code class="language-java"><span class="kn">import</span> <span class="nn">java.util.Map</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flink.api.common.functions.MapFunction</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flink.api.java.DataSet</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flink.api.java.ExecutionEnvironment</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flink.api.java.tuple.Tuple2</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.hadoop.io.Text</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.hadoop.mapreduce.Job</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">com.microsoft.hadoop.azure.AzureTableConfiguration</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">com.microsoft.hadoop.azure.AzureTableInputFormat</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">com.microsoft.hadoop.azure.WritableEntity</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">com.microsoft.windowsazure.storage.table.EntityProperty</span><span class="o">;</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">AzureTableExample</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="c1">// set up the execution environment</span>
<span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="c1">// create a AzureTableInputFormat, using a Hadoop input format wrapper</span>
<span class="n">HadoopInputFormat</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">WritableEntity</span><span class="o">&gt;</span> <span class="n">hdIf</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HadoopInputFormat</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">WritableEntity</span><span class="o">&gt;(</span><span class="k">new</span> <span class="nf">AzureTableInputFormat</span><span class="o">(),</span> <span class="n">Text</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">WritableEntity</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="k">new</span> <span class="nf">Job</span><span class="o">());</span>
<span class="c1">// set the Account URI, something like: https://apacheflink.table.core.windows.net</span>
<span class="n">hdIf</span><span class="o">.</span><span class="na">getConfiguration</span><span class="o">().</span><span class="na">set</span><span class="o">(</span><span class="n">AzureTableConfiguration</span><span class="o">.</span><span class="na">Keys</span><span class="o">.</span><span class="na">ACCOUNT_URI</span><span class="o">.</span><span class="na">getKey</span><span class="o">(),</span> <span class="s">&quot;TODO&quot;</span><span class="o">);</span>
<span class="c1">// set the secret storage key here</span>
<span class="n">hdIf</span><span class="o">.</span><span class="na">getConfiguration</span><span class="o">().</span><span class="na">set</span><span class="o">(</span><span class="n">AzureTableConfiguration</span><span class="o">.</span><span class="na">Keys</span><span class="o">.</span><span class="na">STORAGE_KEY</span><span class="o">.</span><span class="na">getKey</span><span class="o">(),</span> <span class="s">&quot;TODO&quot;</span><span class="o">);</span>
<span class="c1">// set the table name here</span>
<span class="n">hdIf</span><span class="o">.</span><span class="na">getConfiguration</span><span class="o">().</span><span class="na">set</span><span class="o">(</span><span class="n">AzureTableConfiguration</span><span class="o">.</span><span class="na">Keys</span><span class="o">.</span><span class="na">TABLE_NAME</span><span class="o">.</span><span class="na">getKey</span><span class="o">(),</span> <span class="s">&quot;TODO&quot;</span><span class="o">);</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">WritableEntity</span><span class="o">&gt;&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">createInput</span><span class="o">(</span><span class="n">hdIf</span><span class="o">);</span>
<span class="c1">// a little example how to use the data in a mapper.</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">fin</span> <span class="o">=</span> <span class="n">input</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span><span class="n">WritableEntity</span><span class="o">&gt;,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">String</span> <span class="nf">map</span><span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">WritableEntity</span><span class="o">&gt;</span> <span class="n">arg0</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">System</span><span class="o">.</span><span class="na">err</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;--------------------------------\nKey = &quot;</span><span class="o">+</span><span class="n">arg0</span><span class="o">.</span><span class="na">f0</span><span class="o">);</span>
<span class="n">WritableEntity</span> <span class="n">we</span> <span class="o">=</span> <span class="n">arg0</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
<span class="k">for</span><span class="o">(</span><span class="n">Map</span><span class="o">.</span><span class="na">Entry</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">EntityProperty</span><span class="o">&gt;</span> <span class="n">prop</span> <span class="o">:</span> <span class="n">we</span><span class="o">.</span><span class="na">getProperties</span><span class="o">().</span><span class="na">entrySet</span><span class="o">())</span> <span class="o">{</span>
<span class="n">System</span><span class="o">.</span><span class="na">err</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;key=&quot;</span><span class="o">+</span><span class="n">prop</span><span class="o">.</span><span class="na">getKey</span><span class="o">()</span> <span class="o">+</span> <span class="s">&quot; ; value (asString)=&quot;</span><span class="o">+</span><span class="n">prop</span><span class="o">.</span><span class="na">getValue</span><span class="o">().</span><span class="na">getValueAsString</span><span class="o">());</span>
<span class="o">}</span>
<span class="k">return</span> <span class="n">arg0</span><span class="o">.</span><span class="na">f0</span><span class="o">.</span><span class="na">toString</span><span class="o">();</span>
<span class="o">}</span>
<span class="o">});</span>
<span class="c1">// emit result (this works only locally)</span>
<span class="n">fin</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
<span class="c1">// execute program</span>
<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;Azure Example&quot;</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>The example shows how to access an Azure table and turn data into Flinkā€™s <code>DataSet</code> (more specifically, the type of the set is <code>DataSet&lt;Tuple2&lt;Text, WritableEntity&gt;&gt;</code>). With the <code>DataSet</code>, you can apply all known transformations to the DataSet.</p>
<h2 id="access-mongodb">Access MongoDB</h2>
<p>This <a href="https://github.com/okkam-it/flink-mongodb-test">GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating)</a>.</p>
</div>
<div class="col-sm-10 col-sm-offset-1">
<!-- Disqus thread and some vertical offset -->
<div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div>
</div>
</div>
</div><!-- /.container -->
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="http://flink.apache.org/docs/master/page/js/codetabs.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
<!-- Disqus -->
<script type="text/javascript">
var disqus_shortname = 'stratosphere-eu';
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
</body>
</html>