blob: 34879dd0710c7ef490368fb9c97ecd0fd9d5e562 [file] [log] [blame] [view]
---
title: "Cluster Execution"
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
* This will be replaced by the TOC
{:toc}
Flink programs can run distributed on clusters of many machines. There
are two ways to send a program to a cluster for execution:
## Command Line Interface
The command line interface lets you submit packaged programs (JARs) to a cluster
(or single machine setup).
Please refer to the [Command Line Interface](cli.html) documentation for
details.
## Remote Environment
The remote environment lets you execute Flink Java programs on a cluster
directly. The remote environment points to the cluster on which you want to
execute the program.
### Maven Dependency
If you are developing your program as a Maven project, you have to add the
`flink-clients` module using this dependency:
~~~xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>{{ site.FLINK_VERSION_SHORT }}</version>
</dependency>
~~~
### Example
The following illustrates the use of the `RemoteEnvironment`:
~~~java
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("strato-master", "7661", "/home/user/udfs.jar");
DataSet<String> data = env.readTextFile("hdfs://path/to/file");
data
.filter(new FilterFunction<String>() {
public boolean filter(String value) {
return value.startsWith("http://");
}
})
.writeAsText("hdfs://path/to/result");
env.execute();
}
~~~
Note that the program contains custom user code and hence requires a JAR file with
the classes of the code attached. The constructor of the remote environment
takes the path(s) to the JAR file(s).
## Remote Executor
Similar to the RemoteEnvironment, the RemoteExecutor lets you execute
Flink programs on a cluster directly. The remote executor accepts a
*Plan* object, which describes the program as a single executable unit.
### Maven Dependency
If you are developing your program in a Maven project, you have to add the
`flink-clients` module using this dependency:
~~~xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>{{ site.FLINK_VERSION_SHORT }}</version>
</dependency>
~~~
### Example
The following illustrates the use of the `RemoteExecutor` with the Scala API:
~~~scala
def main(args: Array[String]) {
val input = TextFile("hdfs://path/to/file")
val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } }
val counts = words groupBy { x => x } count()
val output = counts.write(wordsOutput, CsvOutputFormat())
val plan = new ScalaPlan(Seq(output), "Word Count")
val executor = new RemoteExecutor("strato-master", 7881, "/path/to/jarfile.jar")
executor.executePlan(p);
}
~~~
The following illustrates the use of the `RemoteExecutor` with the Java API (as
an alternative to the RemoteEnvironment):
~~~java
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> data = env.readTextFile("hdfs://path/to/file");
data
.filter(new FilterFunction<String>() {
public boolean filter(String value) {
return value.startsWith("http://");
}
})
.writeAsText("hdfs://path/to/result");
Plan p = env.createProgramPlan();
RemoteExecutor e = new RemoteExecutor("strato-master", 7881, "/path/to/jarfile.jar");
e.executePlan(p);
}
~~~
Note that the program contains custom UDFs and hence requires a JAR file with
the classes of the code attached. The constructor of the remote executor takes
the path(s) to the JAR file(s).