hide:
license: | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
This documentation gives a quick start guide for running Spark/Flink/MapReduce with Apache Celeborn™.
Download the latest Celeborn binary from the Downloading Page. Decompress the binary and set $CELEBORN_HOME
.
tar -C <DST_DIR> -zxvf apache-celeborn-<VERSION>-bin.tgz export CELEBORN_HOME=<Decompressed path>
cd $CELEBORN_HOME/conf cp log4j2.xml.template log4j2.xml
Configure the directory to store shuffle data, for example $CELEBORN_HOME/shuffle
.
cd $CELEBORN_HOME/conf echo "celeborn.worker.storage.dirs=$CELEBORN_HOME/shuffle" > celeborn-defaults.conf
cd $CELEBORN_HOME ./sbin/start-master.sh
You should see Master
's ip:port in the log:
INFO [main] NettyRpcEnvFactory: Starting RPC Server [Master] on 192.168.2.109:9097 with advertised endpoint 192.168.2.109:9097
Use the Master's IP and Port to start Worker:
cd $CELEBORN_HOME ./sbin/start-worker.sh celeborn://<Master IP>:<Master Port>
You should see the following message in Worker's log:
INFO [main] MasterClient: connect to master 192.168.2.109:9097. INFO [main] Worker: Register worker successfully. INFO [main] Worker: Worker started.
And also the following message in Master's log:
INFO [dispatcher-event-loop-9] Master: Registered worker Host: 192.168.2.109 RpcPort: 57806 PushPort: 57807 FetchPort: 57809 ReplicatePort: 57808 SlotsUsed: 0 LastHeartbeat: 0 HeartbeatElapsedSeconds: xxx Disks: DiskInfo0: xxx UserResourceConsumption: empty WorkerRef: null
Celeborn release binary contains clients for Spark 2.x and Spark 3.x, copy the corresponding client jar into Spark's jars/
directory:
cp $CELEBORN_HOME/spark/celeborn-client-spark-<spark.major.version>-shaded_<scala.binary.version>-<celeborn.version>.jar $SPARK_HOME/jars/
Set spark.shuffle.manager
to Celeborn's ShuffleManager, and turn off spark.shuffle.service.enabled
:
cd $SPARK_HOME ./bin/spark-shell \ --conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \ --conf spark.shuffle.service.enabled=false
Then run the following test case:
spark.sparkContext .parallelize(1 to 10, 10) .flatMap(_ => (1 to 100).iterator.map(num => num)) .repartition(10) .count
During the Spark Job, you should see the following message in Celeborn Master's log:
Master: Offer slots successfully for 10 reducers of local-1690000152711-0 on 1 workers.
And the following message in Celeborn Worker's log:
INFO [dispatcher-event-loop-9] Controller: Reserved 10 primary location and 0 replica location for local-1690000152711-0 INFO [dispatcher-event-loop-8] Controller: Start commitFiles for local-1690000152711-0 INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success with 10 committed primary partitions, 0 empty primary partitions , 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions , 0 failed replica partitions.
Important: Only Flink batch jobs are supported for now.
Celeborn release binary contains clients for Flink 1.16.x, Flink 1.17.x, Flink 1.18.x, Flink 1.19.x, Flink 1.20.x, Flink 2.0.x and Flink 2.1.x, copy the corresponding client jar into Flink's lib/
directory:
cp $CELEBORN_HOME/flink/celeborn-client-flink-<flink.version>-shaded_<scala.binary.version>-<celeborn.version>.jar $FLINK_HOME/lib/
Set shuffle-service-factory.class
to Celeborn's ShuffleServiceFactory in Flink configuration file:
cd $FLINK_HOME vi conf/flink-conf.yaml
cd $FLINK_HOME vi conf/config.yaml
Choose one of flink integration strategies and add the following configuration:
(Support Flink 1.16 and above versions) Flink Remote Shuffle Service Config
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING
Note: The config option execution.batch-shuffle-mode
should configure as ALL_EXCHANGES_BLOCKING
.
(Support Flink 1.20 and above versions) Flink hybrid shuffle Config
shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED
Note: The config option execution.batch-shuffle-mode
should configure as ALL_EXCHANGES_HYBRID_FULL
.
Then deploy the example word count job to the running cluster:
cd $FLINK_HOME ./bin/flink run examples/streaming/WordCount.jar --execution-mode BATCH
During the Flink Job, you should see the following message in Celeborn Master's log:
Master: Offer slots successfully for 1 reducers of local-1690000152711-0 on 1 workers.
And the following message in Celeborn Worker's log:
INFO [dispatcher-event-loop-4] Controller: Reserved 1 primary location and 0 replica location for local-1690000152711-0 INFO [dispatcher-event-loop-3] Controller: Start commitFiles for local-1690000152711-0 INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success with 1 committed primary partitions, 0 empty primary partitions , 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions , 0 failed replica partitions.
$CELEBORN_HOME/mr/*.jar
into mapreduce.application.classpath
and yarn.application.classpath
.cp $CELEBORN_HOME/mr/celeborn-client-mr-shaded_<scala.binary.version>-<celeborn.version>.jar <mapreduce.application.classpath> cp $CELEBORN_HOME/mr/celeborn-client-mr-shaded_<scala.binary.version>-<celeborn.version>.jar <yarn.application.classpath>
${HADOOP_CONF_DIR}/yarn-site.xml
.<configuration> <property> <name>yarn.app.mapreduce.am.job.recovery.enable</name> <value>false</value> </property> <property> <name>yarn.app.mapreduce.am.command-opts</name> <!-- Append 'org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn' to this setting --> <value>org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn</value> </property> </configuration>
${HADOOP_CONF_DIR}/mapred-site.xml
.<configuration> <property> <name>mapreduce.job.reduce.slowstart.completedmaps</name> <value>1</value> </property> <property> <name>mapreduce.celeborn.master.endpoints</name> <!-- Replace placeholder to the real master address --> <value>placeholder</value> </property> <property> <name>mapreduce.job.map.output.collector.class</name> <value>org.apache.hadoop.mapred.CelebornMapOutputCollector</value> </property> <property> <name>mapreduce.job.reduce.shuffle.consumer.plugin.class</name> <value>org.apache.hadoop.mapreduce.task.reduce.CelebornShuffleConsumer</value> </property> </configuration>
Note: MRAppMasterWithCeleborn
supports setting mapreduce.celeborn.master.endpoints
via environment variable CELEBORN_MASTER_ENDPOINTS
. Meanwhile, MRAppMasterWithCeleborn
disables yarn.app.mapreduce.am.job.recovery.enable
and sets mapreduce.job.reduce.slowstart.completedmaps
to 1 by default.
Then deploy the example word count to the running cluster for verifying whether above configurations are correct.
cd $HADOOP_HOME ./bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1.jar wordcount /someinput /someoutput
During the MapReduce Job, you should see the following message in Celeborn Master's log:
Master: Offer slots successfully for 1 reducers of application_1694674023293_0003-0 on 1 workers.
And the following message in Celeborn Worker's log:
INFO [dispatcher-event-loop-4] Controller: Reserved 1 primary location and 0 replica location for application_1694674023293_0003-0 INFO [dispatcher-event-loop-3] Controller: Start commitFiles for application_1694674023293_0003-0 INFO [async-reply] Controller: CommitFiles for application_1694674023293_0003-0 success with 1 committed primary partitions, 0 empty primary partitions , 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions , 0 failed replica partitions.