| --- |
| title: "Connecting to other systems (Batch)" |
| --- |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| ## Reading from file systems. |
| |
| Flink has build-in support for the following file systems: |
| |
| | Filesystem | Scheme | Notes | |
| | ------------------------------------- |--------------| ------ | |
| | Hadoop Distributed File System (HDFS) | `hdfs://` | All HDFS versions are supported | |
| | Amazon S3 | `s3://` | Support through Hadoop file system implementation (see below) | |
| | MapR file system | `maprfs://` | The user has to manually place the required jar files in the `lib/` dir | |
| | Tachyon | `tachyon://` | Support through Hadoop file system implementation (see below) | |
| |
| |
| |
| ### Using Hadoop file system implementations |
| |
| Apache Flink allows users to use any file system implementing the `org.apache.hadoop.fs.FileSystem` |
| interface. There are Hadoop `FileSystem` implementations for |
| |
| - [S3](https://aws.amazon.com/s3/) (tested) |
| - [Google Cloud Storage Connector for Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector) (tested) |
| - [Tachyon](http://tachyon-project.org/) (tested) |
| - [XtreemFS](http://www.xtreemfs.org/) (tested) |
| - FTP via [Hftp](http://hadoop.apache.org/docs/r1.2.1/hftp.html) (not tested) |
| - and many more. |
| |
| In order to use a Hadoop file system with Flink, make sure that |
| |
| - the `flink-conf.yaml` has set the `fs.hdfs.hadoopconf` property set to the Hadoop configuration directory. |
| - the Hadoop configuration (in that directory) has an entry for the required file system. Examples for S3 and Tachyon are shown below. |
| - the required classes for using the file system are available in the `lib/` folder of the Flink installation (on all machines running Flink). If putting the files into the directory is not possible, Flink is also respecting the `HADOOP_CLASSPATH` environment variable to add Hadoop jar files to the classpath. |
| |
| #### Amazon S3 |
| |
| For Amazon S3 support add the following entries into the `core-site.xml` file: |
| |
| ~~~xml |
| <!-- configure the file system implementation --> |
| <property> |
| <name>fs.s3.impl</name> |
| <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value> |
| </property> |
| |
| <!-- set your AWS ID --> |
| <property> |
| <name>fs.s3.awsAccessKeyId</name> |
| <value>putKeyHere</value> |
| </property> |
| |
| <!-- set your AWS access key --> |
| <property> |
| <name>fs.s3.awsSecretAccessKey</name> |
| <value>putSecretHere</value> |
| </property> |
| ~~~ |
| |
| #### Tachyon |
| |
| For Tachyon support add the following entry into the `core-site.xml` file: |
| |
| ~~~xml |
| <property> |
| <name>fs.tachyon.impl</name> |
| <value>tachyon.hadoop.TFS</value> |
| </property> |
| ~~~ |
| |
| |
| ## Connecting to other systems using Input/OutputFormat wrappers for Hadoop |
| |
| Apache Flink allows users to access many different systems as data sources or sinks. |
| The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept |
| of so called `InputFormat`s and `OutputFormat`s. |
| |
| One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows |
| users to use all existing Hadoop input formats with Flink. |
| |
| This section shows some examples for connecting Flink to other systems. |
| [Read more about Hadoop compatibility in Flink](hadoop_compatibility.html). |
| |
| ## Avro support in Flink |
| |
| Flink has extensive build-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read from Avro files with Flink. |
| Also, the serialization framework of Flink is able to handle classes generated from Avro schemas. |
| |
| In order to read data from an Avro file, you have to specify an `AvroInputFormat`. |
| |
| **Example**: |
| |
| ~~~java |
| AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); |
| DataSet<User> usersDS = env.createInput(users); |
| ~~~ |
| |
| Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example: |
| |
| ~~~java |
| usersDS.groupBy("name") |
| ~~~ |
| |
| |
| Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use. |
| |
| Flink's POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type `Object` you can not use the field as a join or grouping key. |
| Specifying a field in Avro like this `{"name": "type_double_test", "type": "double"},` works fine, however specifying it as a UNION-type with only one field (`{"name": "type_double_test", "type": ["double"]},`) will generate a field of type `Object`. Note that specifying nullable types (`{"name": "type_double_test", "type": ["null", "double"]},`) is possible! |
| |
| |
| |
| ### Access Microsoft Azure Table Storage |
| |
| _Note: This example works starting from Flink 0.6-incubating_ |
| |
| This example is using the `HadoopInputFormat` wrapper to use an existing Hadoop input format implementation for accessing [Azure's Table Storage](https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/). |
| |
| 1. Download and compile the `azure-tables-hadoop` project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves. |
| Execute the following commands: |
| |
| ~~~bash |
| git clone https://github.com/mooso/azure-tables-hadoop.git |
| cd azure-tables-hadoop |
| mvn clean install |
| ~~~ |
| |
| 2. Setup a new Flink project using the quickstarts: |
| |
| ~~~bash |
| curl https://flink.apache.org/q/quickstart.sh | bash |
| ~~~ |
| |
| 3. Add the following dependencies (in the `<dependencies>` section) to your `pom.xml` file: |
| |
| ~~~xml |
| <dependency> |
| <groupId>org.apache.flink</groupId> |
| <artifactId>flink-hadoop-compatibility</artifactId> |
| <version>{{site.version}}</version> |
| </dependency> |
| <dependency> |
| <groupId>com.microsoft.hadoop</groupId> |
| <artifactId>microsoft-hadoop-azure</artifactId> |
| <version>0.0.4</version> |
| </dependency> |
| ~~~ |
| |
| `flink-hadoop-compatibility` is a Flink package that provides the Hadoop input format wrappers. |
| `microsoft-hadoop-azure` is adding the project we've build before to our project. |
| |
| The project is now prepared for starting to code. We recommend to import the project into an IDE, such as Eclipse or IntelliJ. (Import as a Maven project!). |
| Browse to the code of the `Job.java` file. Its an empty skeleton for a Flink job. |
| |
| Paste the following code into it: |
| |
| ~~~java |
| import java.util.Map; |
| import org.apache.flink.api.common.functions.MapFunction; |
| import org.apache.flink.api.java.DataSet; |
| import org.apache.flink.api.java.ExecutionEnvironment; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapreduce.Job; |
| import com.microsoft.hadoop.azure.AzureTableConfiguration; |
| import com.microsoft.hadoop.azure.AzureTableInputFormat; |
| import com.microsoft.hadoop.azure.WritableEntity; |
| import com.microsoft.windowsazure.storage.table.EntityProperty; |
| |
| public class AzureTableExample { |
| |
| public static void main(String[] args) throws Exception { |
| // set up the execution environment |
| final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); |
| |
| // create a AzureTableInputFormat, using a Hadoop input format wrapper |
| HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job()); |
| |
| // set the Account URI, something like: https://apacheflink.table.core.windows.net |
| hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(), "TODO"); |
| // set the secret storage key here |
| hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO"); |
| // set the table name here |
| hdIf.getConfiguration().set(AzureTableConfiguration.Keys.TABLE_NAME.getKey(), "TODO"); |
| |
| DataSet<Tuple2<Text, WritableEntity>> input = env.createInput(hdIf); |
| // a little example how to use the data in a mapper. |
| DataSet<String> fin = input.map(new MapFunction<Tuple2<Text,WritableEntity>, String>() { |
| @Override |
| public String map(Tuple2<Text, WritableEntity> arg0) throws Exception { |
| System.err.println("--------------------------------\nKey = "+arg0.f0); |
| WritableEntity we = arg0.f1; |
| |
| for(Map.Entry<String, EntityProperty> prop : we.getProperties().entrySet()) { |
| System.err.println("key="+prop.getKey() + " ; value (asString)="+prop.getValue().getValueAsString()); |
| } |
| |
| return arg0.f0.toString(); |
| } |
| }); |
| |
| // emit result (this works only locally) |
| fin.print(); |
| |
| // execute program |
| env.execute("Azure Example"); |
| } |
| } |
| ~~~ |
| |
| The example shows how to access an Azure table and turn data into Flink's `DataSet` (more specifically, the type of the set is `DataSet<Tuple2<Text, WritableEntity>>`). With the `DataSet`, you can apply all known transformations to the DataSet. |
| |
| ## Access MongoDB |
| |
| This [GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating)](https://github.com/okkam-it/flink-mongodb-test). |
| |
| |