| --- |
| title: "Cluster Setup" |
| --- |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| This documentation is intended to provide instructions on how to run |
| Flink in a fully distributed fashion on a static (but possibly |
| heterogeneous) cluster. |
| |
| This involves two steps. First, installing and configuring Flink and |
| second installing and configuring the [Hadoop Distributed |
| Filesystem](http://hadoop.apache.org/) (HDFS). |
| |
| * This will be replaced by the TOC |
| {:toc} |
| |
| ## Preparing the Cluster |
| |
| ### Software Requirements |
| |
| Flink runs on all *UNIX-like environments*, e.g. **Linux**, **Mac OS X**, |
| and **Cygwin** (for Windows) and expects the cluster to consist of **one master |
| node** and **one or more worker nodes**. Before you start to setup the system, |
| make sure you have the following software installed **on each node**: |
| |
| - **Java 1.7.x** or higher, |
| - **ssh** (sshd must be running to use the Flink scripts that manage |
| remote components) |
| |
| If your cluster does not fulfill these software requirements you will need to |
| install/upgrade it. |
| |
| For example, on Ubuntu Linux, type in the following commands to install Java and |
| ssh: |
| |
| ~~~bash |
| sudo apt-get install ssh |
| sudo apt-get install openjdk-7-jre |
| ~~~ |
| |
| You can check the correct installation of Java by issuing the following command: |
| |
| ~~~bash |
| java -version |
| ~~~ |
| |
| The command should output something comparable to the following on every node of |
| your cluster (depending on your Java version, there may be small differences): |
| |
| ~~~bash |
| java version "1.7.0_55" |
| Java(TM) SE Runtime Environment (build 1.7.0_55-b13) |
| Java HotSpot(TM) 64-Bit Server VM (build 24.55-b03, mixed mode) |
| ~~~ |
| |
| To make sure the ssh daemon is running properly, you can use the command |
| |
| ~~~bash |
| ps aux | grep sshd |
| ~~~ |
| |
| Something comparable to the following line should appear in the output |
| of the command on every host of your cluster: |
| |
| ~~~bash |
| root 894 0.0 0.0 49260 320 ? Ss Jan09 0:13 /usr/sbin/sshd |
| ~~~ |
| |
| ### Configuring Remote Access with ssh |
| |
| In order to start/stop the remote processes, the master node requires access via |
| ssh to the worker nodes. It is most convenient to use ssh's public key |
| authentication for this. To setup public key authentication, log on to the |
| master as the user who will later execute all the Flink components. **The |
| same user (i.e. a user with the same user name) must also exist on all worker |
| nodes**. For the remainder of this instruction we will refer to this user as |
| *flink*. Using the super user *root* is highly discouraged for security |
| reasons. |
| |
| Once you logged in to the master node as the desired user, you must generate a |
| new public/private key pair. The following command will create a new |
| public/private key pair into the *.ssh* directory inside the home directory of |
| the user *flink*. See the ssh-keygen man page for more details. Note that |
| the private key is not protected by a passphrase. |
| |
| ~~~bash |
| ssh-keygen -b 2048 -P '' -f ~/.ssh/id_rsa |
| ~~~ |
| |
| Next, copy/append the content of the file *.ssh/id_rsa.pub* to your |
| authorized_keys file. The content of the authorized_keys file defines which |
| public keys are considered trustworthy during the public key authentication |
| process. On most systems the appropriate command is |
| |
| ~~~bash |
| cat .ssh/id_rsa.pub >> .ssh/authorized_keys |
| ~~~ |
| |
| On some Linux systems, the authorized keys file may also be expected by the ssh |
| daemon under *.ssh/authorized_keys2*. In either case, you should make sure the |
| file only contains those public keys which you consider trustworthy for each |
| node of cluster. |
| |
| Finally, the authorized keys file must be copied to every worker node of your |
| cluster. You can do this by repeatedly typing in |
| |
| ~~~bash |
| scp .ssh/authorized_keys <worker>:~/.ssh/ |
| ~~~ |
| |
| and replacing *\<worker\>* with the host name of the respective worker node. |
| After having finished the copy process, you should be able to log on to each |
| worker node from your master node via ssh without a password. |
| |
| ### Setting JAVA_HOME on each Node |
| |
| Flink requires the `JAVA_HOME` environment variable to be set on the |
| master and all worker nodes and point to the directory of your Java |
| installation. |
| |
| You can set this variable in `conf/flink-conf.yaml` via the |
| `env.java.home` key. |
| |
| Alternatively, add the following line to your shell profile. If you use the |
| *bash* shell (probably the most common shell), the shell profile is located in |
| *\~/.bashrc*: |
| |
| ~~~bash |
| export JAVA_HOME=/path/to/java_home/ |
| ~~~ |
| |
| If your ssh daemon supports user environments, you can also add `JAVA_HOME` to |
| *.\~/.ssh/environment*. As super user *root* you can enable ssh user |
| environments with the following commands: |
| |
| ~~~bash |
| echo "PermitUserEnvironment yes" >> /etc/ssh/sshd_config |
| /etc/init.d/ssh restart |
| |
| # on some system you might need to replace the above line with |
| /etc/init.d/sshd restart |
| ~~~ |
| |
| |
| ## Flink Setup |
| |
| Go to the [downloads page]({{site.baseurl}}/downloads.html) and get the ready to run |
| package. Make sure to pick the Flink package **matching your Hadoop |
| version**. |
| |
| After downloading the latest release, copy the archive to your master node and |
| extract it: |
| |
| ~~~bash |
| tar xzf flink-*.tgz |
| cd flink-* |
| ~~~ |
| |
| ### Configuring the Cluster |
| |
| After having extracted the system files, you need to configure Flink for |
| the cluster by editing *conf/flink-conf.yaml*. |
| |
| Set the `jobmanager.rpc.address` key to point to your master node. Furthermode |
| define the maximum amount of main memory the JVM is allowed to allocate on each |
| node by setting the `jobmanager.heap.mb` and `taskmanager.heap.mb` keys. |
| |
| The value is given in MB. If some worker nodes have more main memory which you |
| want to allocate to the Flink system you can overwrite the default value |
| by setting an environment variable `FLINK_TM_HEAP` on the respective |
| node. |
| |
| Finally you must provide a list of all nodes in your cluster which shall be used |
| as worker nodes. Therefore, similar to the HDFS configuration, edit the file |
| *conf/slaves* and enter the IP/host name of each worker node. Each worker node |
| will later run a TaskManager. |
| |
| Each entry must be separated by a new line, as in the following example: |
| |
| ~~~ |
| 192.168.0.100 |
| 192.168.0.101 |
| . |
| . |
| . |
| 192.168.0.150 |
| ~~~ |
| |
| The Flink directory must be available on every worker under the same |
| path. Similarly as for HDFS, you can use a shared NSF directory, or copy the |
| entire Flink directory to every worker node. |
| |
| Please see the [configuration page](config.html) for details and additional |
| configuration options. |
| |
| In particular, |
| |
| * the amount of available memory per TaskManager (`taskmanager.heap.mb`), |
| * the number of available CPUs per machine (`taskmanager.numberOfTaskSlots`), |
| * the total number of CPUs in the cluster (`parallelism.default`) and |
| * the temporary directories (`taskmanager.tmp.dirs`) |
| |
| are very important configuration values. |
| |
| |
| ### Starting Flink |
| |
| The following script starts a JobManager on the local node and connects via |
| SSH to all worker nodes listed in the *slaves* file to start the |
| TaskManager on each node. Now your Flink system is up and |
| running. The JobManager running on the local node will now accept jobs |
| at the configured RPC port. |
| |
| Assuming that you are on the master node and inside the Flink directory: |
| |
| ~~~bash |
| bin/start-cluster.sh |
| ~~~ |
| |
| To stop Flink, there is also a `stop-cluster.sh` script. |
| |
| |
| ### Starting Flink in the streaming mode |
| |
| ~~~bash |
| bin/start-cluster-streaming.sh |
| ~~~ |
| |
| The streaming mode changes the startup behavior of Flink: The system is not |
| bringing up the managed memory services with preallocated memory at the beginning. |
| Flink streaming is not using the managed memory employed by the batch operators. |
| By not starting these services with preallocated memory, streaming jobs can benefit |
| from more heap space being available. |
| |
| Note that you can still start batch jobs in the streaming mode. The memory manager |
| will then allocate memory segments from the Java heap as needed. |
| |
| |
| ## Optional: Hadoop Distributed Filesystem (HDFS) Setup |
| |
| **NOTE** Flink does not require HDFS to run; HDFS is simply a typical choice of a distributed data |
| store to read data from (in parallel) and write results to. |
| If HDFS is already available on the cluster, or Flink is used purely with different storage |
| techniques (e.g., Apache Kafka, JDBC, Rabbit MQ, or other storage or message queues), this |
| setup step is not needed. |
| |
| |
| The following instructions are a general overview of usual required settings. Please consult one of the |
| many installation guides available online for more detailed instructions. |
| |
| __Note that the following instructions are based on Hadoop 1.2 and might differ |
| for Hadoop 2.__ |
| |
| ### Downloading, Installing, and Configuring HDFS |
| |
| Similar to the Flink system HDFS runs in a distributed fashion. HDFS |
| consists of a **NameNode** which manages the distributed file system's meta |
| data. The actual data is stored by one or more **DataNodes**. For the remainder |
| of this instruction we assume the HDFS's NameNode component runs on the master |
| node while all the worker nodes run an HDFS DataNode. |
| |
| To start, log on to your master node and download Hadoop (which includes HDFS) |
| from the Apache [Hadoop Releases](http://hadoop.apache.org/releases.html) page. |
| |
| Next, extract the Hadoop archive. |
| |
| After having extracted the Hadoop archive, change into the Hadoop directory and |
| edit the Hadoop environment configuration file: |
| |
| ~~~bash |
| cd hadoop-* |
| vi conf/hadoop-env.sh |
| ~~~ |
| |
| Uncomment and modify the following line in the file according to the path of |
| your Java installation. |
| |
| ~~~ |
| export JAVA_HOME=/path/to/java_home/ |
| ~~~ |
| |
| Save the changes and open the HDFS configuration file *conf/hdfs-site.xml*. HDFS |
| offers multiple configuration parameters which affect the behavior of the |
| distributed file system in various ways. The following excerpt shows a minimal |
| configuration which is required to make HDFS work. More information on how to |
| configure HDFS can be found in the [HDFS User |
| Guide](http://hadoop.apache.org/docs/r1.2.1/hdfs_user_guide.html) guide. |
| |
| ~~~xml |
| <configuration> |
| <property> |
| <name>fs.default.name</name> |
| <value>hdfs://MASTER:50040/</value> |
| </property> |
| <property> |
| <name>dfs.data.dir</name> |
| <value>DATAPATH</value> |
| </property> |
| </configuration> |
| ~~~ |
| |
| Replace *MASTER* with the IP/host name of your master node which runs the |
| *NameNode*. *DATAPATH* must be replaced with path to the directory in which the |
| actual HDFS data shall be stored on each worker node. Make sure that the |
| *flink* user has sufficient permissions to read and write in that |
| directory. |
| |
| After having saved the HDFS configuration file, open the file *conf/slaves* and |
| enter the IP/host name of those worker nodes which shall act as *DataNode*s. |
| Each entry must be separated by a line break. |
| |
| ~~~ |
| <worker 1> |
| <worker 2> |
| . |
| . |
| . |
| <worker n> |
| ~~~ |
| |
| Initialize the HDFS by typing in the following command. Note that the |
| command will **delete all data** which has been previously stored in the |
| HDFS. However, since we have just installed a fresh HDFS, it should be |
| safe to answer the confirmation with *yes*. |
| |
| ~~~bash |
| bin/hadoop namenode -format |
| ~~~ |
| |
| Finally, we need to make sure that the Hadoop directory is available to |
| all worker nodes which are intended to act as DataNodes and that all nodes |
| **find the directory under the same path**. We recommend to use a shared network |
| directory (e.g. an NFS share) for that. Alternatively, one can copy the |
| directory to all nodes (with the disadvantage that all configuration and |
| code updates need to be synced to all nodes). |
| |
| ### Starting HDFS |
| |
| To start the HDFS log on to the master and type in the following |
| commands |
| |
| ~~~bash |
| cd hadoop-* |
| bin/start-dfs.sh |
| ~~~ |
| |
| If your HDFS setup is correct, you should be able to open the HDFS |
| status website at *http://MASTER:50070*. In a matter of a seconds, |
| all DataNodes should appear as live nodes. For troubleshooting we would |
| like to point you to the [Hadoop Quick |
| Start](http://wiki.apache.org/hadoop/QuickStart) |
| guide. |
| |
| |