blob: d81ab331f90d97d197a8f117ac485a17a207a1b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.task.reduce;
import java.lang.reflect.Field;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.uniffle.common.exception.RssException;
// In MR shuffle, MapOutput encapsulates the logic to fetch map task's output data via http.
// So, in RSS, we should bypass this logic, and directly write data to MapOutput.
public class RssBypassWriter {
public static void write(MapOutput mapOutput, byte[] buffer) {
// Write and commit uncompressed data to MapOutput.
// In the majority of cases, merger allocates memory to accept data,
// but when data size exceeds the threshold, merger can also allocate disk.
// So, we should consider the two situations, respectively.
if (mapOutput instanceof InMemoryMapOutput) {
InMemoryMapOutput inMemoryMapOutput = (InMemoryMapOutput) mapOutput;
// In InMemoryMapOutput constructor method, we create a decompressor or borrow a decompressor
// from
// pool. Now we need to put it back, otherwise we will create a decompressor for every
// InMemoryMapOutput
// object, they will cause `out of direct memory` problems.
CodecPool.returnDecompressor(getDecompressor(inMemoryMapOutput));
write(inMemoryMapOutput, buffer);
} else if (mapOutput instanceof OnDiskMapOutput) {
// RSS leverages its own compression, it is incompatible with hadoop's disk file compression.
// So we should disable this situation.
throw new IllegalStateException(
"RSS does not support OnDiskMapOutput as shuffle ouput,"
+ " try to reduce mapreduce.reduce.shuffle.memory.limit.percent");
} else {
throw new IllegalStateException(
"Merger reserve unknown type of MapOutput: " + mapOutput.getClass().getCanonicalName());
}
}
private static void write(InMemoryMapOutput inMemoryMapOutput, byte[] buffer) {
byte[] memory = inMemoryMapOutput.getMemory();
System.arraycopy(buffer, 0, memory, 0, buffer.length);
}
static Decompressor getDecompressor(InMemoryMapOutput inMemoryMapOutput) {
try {
Class clazz = Class.forName(InMemoryMapOutput.class.getName());
Field deCompressorField = clazz.getDeclaredField("decompressor");
deCompressorField.setAccessible(true);
Decompressor decompressor = (Decompressor) deCompressorField.get(inMemoryMapOutput);
return decompressor;
} catch (Exception e) {
throw new RssException("Get Decompressor fail " + e.getMessage());
}
}
}