[#1086] [Doc] Simplify the Gluten code and add the doc (#1322)
* gluten integrate for branch-0.8
* spotless check
* add WriteBufferManagerTest test
* todo
* remove addPartition method, add some docs
diff --git a/README.md b/README.md
index a8134d6..989c3b4 100644
--- a/README.md
+++ b/README.md
@@ -258,6 +258,16 @@
spark.dynamicAllocation.enabled true
```
+### Support Spark Columnar Shuffle with Gluten
+To support spark columnar shuffle with Uniffle, use Gluten client
+refer to [Gluten Project](https://github.com/oap-project/gluten)
+
+Update Spark conf to enable integration of Uniffle with Gluten:
+ ```
+ spark.plugins io.glutenproject.GlutenPlugin
+ spark.shuffle.manager org.apache.spark.shuffle.gluten.uniffle.GlutenRssShuffleManager
+ ```
+
### Deploy MapReduce Client
1. Add client jar to the classpath of each NodeManager, e.g., <HADOOP>/share/hadoop/mapreduce/
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 330f56c..6efdfa3 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -86,7 +86,6 @@
private final Map<Integer, List<ShuffleServerInfo>> partitionToServers;
private final Set<ShuffleServerInfo> shuffleServersForData;
private final long[] partitionLengths;
- private final boolean isMemoryShuffleEnabled;
private final Function<String, Boolean> taskFailureCallback;
private final Set<Long> blockIds = Sets.newConcurrentHashSet();
@@ -94,6 +93,7 @@
protected final long taskAttemptId;
protected final ShuffleWriteMetrics shuffleWriteMetrics;
+ protected final boolean isMemoryShuffleEnabled;
private final BlockingQueue<Object> finishEventQueue = new LinkedBlockingQueue<>();
@@ -213,7 +213,7 @@
}
}
- private void writeImpl(Iterator<Product2<K, V>> records) {
+ protected void writeImpl(Iterator<Product2<K, V>> records) throws IOException {
List<ShuffleBlockInfo> shuffleBlockInfos;
boolean isCombine = shuffleDependency.mapSideCombine();
Function1<V, C> createCombiner = null;
@@ -243,7 +243,7 @@
processShuffleBlockInfos(shuffleBlockInfos);
}
long checkStartTs = System.currentTimeMillis();
- checkBlockSendResult(blockIds);
+ internalCheckBlockSendResult();
long commitStartTs = System.currentTimeMillis();
long checkDuration = commitStartTs - checkStartTs;
if (!isMemoryShuffleEnabled) {
@@ -309,6 +309,10 @@
return futures;
}
+ protected void internalCheckBlockSendResult() {
+ checkBlockSendResult(blockIds);
+ }
+
@VisibleForTesting
protected void checkBlockSendResult(Set<Long> blockIds) {
boolean interrupted = false;