blob: b81afb77c4d2889fd8053e5036f83dcae6e03a81 [file] [log] [blame] [view]
---
hide:
- navigation
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---
Quick Start
===
This documentation gives a quick start guide for running Spark/Flink/MapReduce with Apache Celeborn™.
### Download Celeborn
Download the latest Celeborn binary from the [Downloading Page](https://celeborn.apache.org/download/).
Decompress the binary and set `$CELEBORN_HOME`.
```shell
tar -C <DST_DIR> -zxvf apache-celeborn-<VERSION>-bin.tgz
export CELEBORN_HOME=<Decompressed path>
```
## Configure Logging and Storage
#### Configure Logging
```shell
cd $CELEBORN_HOME/conf
cp log4j2.xml.template log4j2.xml
```
#### Configure Storage
Configure the directory to store shuffle data, for example `$CELEBORN_HOME/shuffle`.
```shell
cd $CELEBORN_HOME/conf
echo "celeborn.worker.storage.dirs=$CELEBORN_HOME/shuffle" > celeborn-defaults.conf
```
## Start Celeborn Service
#### Start Master
```shell
cd $CELEBORN_HOME
./sbin/start-master.sh
```
You should see `Master`'s ip:port in the log:
```log
INFO [main] NettyRpcEnvFactory: Starting RPC Server [Master] on 192.168.2.109:9097 with advertised endpoint 192.168.2.109:9097
```
#### Start Worker
Use the Master's IP and Port to start Worker:
```shell
cd $CELEBORN_HOME
./sbin/start-worker.sh celeborn://<Master IP>:<Master Port>
```
You should see the following message in Worker's log:
```log
INFO [main] MasterClient: connect to master 192.168.2.109:9097.
INFO [main] Worker: Register worker successfully.
INFO [main] Worker: Worker started.
```
And also the following message in Master's log:
```log
INFO [dispatcher-event-loop-9] Master: Registered worker
Host: 192.168.2.109
RpcPort: 57806
PushPort: 57807
FetchPort: 57809
ReplicatePort: 57808
SlotsUsed: 0
LastHeartbeat: 0
HeartbeatElapsedSeconds: xxx
Disks:
DiskInfo0: xxx
UserResourceConsumption: empty
WorkerRef: null
```
## Start Spark with Celeborn
#### Copy Celeborn Client to Spark's jars
Celeborn release binary contains clients for Spark 2.x and Spark 3.x, copy the corresponding client jar into Spark's
`jars/` directory:
```shell
cp $CELEBORN_HOME/spark/celeborn-client-spark-<spark.major.version>-shaded_<scala.binary.version>-<celeborn.version>.jar $SPARK_HOME/jars/
```
#### Start spark-shell
Set `spark.shuffle.manager` to Celeborn's ShuffleManager, and turn off `spark.shuffle.service.enabled`:
```shell
cd $SPARK_HOME
./bin/spark-shell \
--conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \
--conf spark.shuffle.service.enabled=false
```
Then run the following test case:
```scala
spark.sparkContext
.parallelize(1 to 10, 10)
.flatMap(_ => (1 to 100).iterator.map(num => num))
.repartition(10)
.count
```
During the Spark Job, you should see the following message in Celeborn Master's log:
```log
Master: Offer slots successfully for 10 reducers of local-1690000152711-0 on 1 workers.
```
And the following message in Celeborn Worker's log:
```log
INFO [dispatcher-event-loop-9] Controller: Reserved 10 primary location and 0 replica location for local-1690000152711-0
INFO [dispatcher-event-loop-8] Controller: Start commitFiles for local-1690000152711-0
INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success with 10 committed primary partitions, 0 empty primary partitions , 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions , 0 failed replica partitions.
```
## Start Flink with Celeborn
**Important: Only Flink batch jobs are supported for now.**
#### Copy Celeborn Client to Flink's lib
Celeborn release binary contains clients for Flink 1.16.x, Flink 1.17.x, Flink 1.18.x, Flink 1.19.x, Flink 1.20.x, Flink 2.0.x and Flink 2.1.x, copy the corresponding client jar into Flink's
`lib/` directory:
```shell
cp $CELEBORN_HOME/flink/celeborn-client-flink-<flink.version>-shaded_<scala.binary.version>-<celeborn.version>.jar $FLINK_HOME/lib/
```
#### Add Celeborn configuration to Flink's conf
Set `shuffle-service-factory.class` to Celeborn's ShuffleServiceFactory in Flink configuration file:
- Flink 1.16.x, Flink 1.17.x, Flink 1.18.x
```shell
cd $FLINK_HOME
vi conf/flink-conf.yaml
```
- Flink 1.19.x, Flink 1.20.x, Flink 2.0.x, Flink 2.1.x
```shell
cd $FLINK_HOME
vi conf/config.yaml
```
Choose one of flink integration strategies and add the following configuration:
**(Support Flink 1.16 and above versions) Flink Remote Shuffle Service Config**
```properties
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`.
**(Support Flink 1.20 and above versions) Flink [hybrid shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle) Config**
```properties
shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory
taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL
jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_HYBRID_FULL`.
Then deploy the example word count job to the running cluster:
```shell
cd $FLINK_HOME
./bin/flink run examples/streaming/WordCount.jar --execution-mode BATCH
```
During the Flink Job, you should see the following message in Celeborn Master's log:
```log
Master: Offer slots successfully for 1 reducers of local-1690000152711-0 on 1 workers.
```
And the following message in Celeborn Worker's log:
```log
INFO [dispatcher-event-loop-4] Controller: Reserved 1 primary location and 0 replica location for local-1690000152711-0
INFO [dispatcher-event-loop-3] Controller: Start commitFiles for local-1690000152711-0
INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success with 1 committed primary partitions, 0 empty primary partitions , 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions , 0 failed replica partitions.
```
## Start MapReduce With Celeborn
### Copy Celeborn Client to MapReduce's classpath
1. Copy `$CELEBORN_HOME/mr/*.jar` into `mapreduce.application.classpath` and `yarn.application.classpath`.
```shell
cp $CELEBORN_HOME/mr/celeborn-client-mr-shaded_<scala.binary.version>-<celeborn.version>.jar <mapreduce.application.classpath>
cp $CELEBORN_HOME/mr/celeborn-client-mr-shaded_<scala.binary.version>-<celeborn.version>.jar <yarn.application.classpath>
```
2. Restart your yarn cluster.
### Add Celeborn configuration to MapReduce's conf
- Modify configurations in `${HADOOP_CONF_DIR}/yarn-site.xml`.
```xml
<configuration>
<property>
<name>yarn.app.mapreduce.am.job.recovery.enable</name>
<value>false</value>
</property>
<property>
<name>yarn.app.mapreduce.am.command-opts</name>
<!-- Append 'org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn' to this setting -->
<value>org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn</value>
</property>
</configuration>
```
- Modify configurations in `${HADOOP_CONF_DIR}/mapred-site.xml`.
```xml
<configuration>
<property>
<name>mapreduce.job.reduce.slowstart.completedmaps</name>
<value>1</value>
</property>
<property>
<name>mapreduce.celeborn.master.endpoints</name>
<!-- Replace placeholder to the real master address -->
<value>placeholder</value>
</property>
<property>
<name>mapreduce.job.map.output.collector.class</name>
<value>org.apache.hadoop.mapred.CelebornMapOutputCollector</value>
</property>
<property>
<name>mapreduce.job.reduce.shuffle.consumer.plugin.class</name>
<value>org.apache.hadoop.mapreduce.task.reduce.CelebornShuffleConsumer</value>
</property>
</configuration>
```
**Note**: `MRAppMasterWithCeleborn` supports setting `mapreduce.celeborn.master.endpoints` via environment variable `CELEBORN_MASTER_ENDPOINTS`.
Meanwhile, `MRAppMasterWithCeleborn` disables `yarn.app.mapreduce.am.job.recovery.enable` and sets `mapreduce.job.reduce.slowstart.completedmaps` to 1 by default.
Then deploy the example word count to the running cluster for verifying whether above configurations are correct.
```shell
cd $HADOOP_HOME
./bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1.jar wordcount /someinput /someoutput
```
During the MapReduce Job, you should see the following message in Celeborn Master's log:
```log
Master: Offer slots successfully for 1 reducers of application_1694674023293_0003-0 on 1 workers.
```
And the following message in Celeborn Worker's log:
```log
INFO [dispatcher-event-loop-4] Controller: Reserved 1 primary location and 0 replica location for application_1694674023293_0003-0
INFO [dispatcher-event-loop-3] Controller: Start commitFiles for application_1694674023293_0003-0
INFO [async-reply] Controller: CommitFiles for application_1694674023293_0003-0 success with 1 committed primary partitions, 0 empty primary partitions , 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions , 0 failed replica partitions.
```