blob: 00f37454fd11f5964c9c0a2f6199455933b8b646 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.common.shuffle;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Deflater;
import javax.annotation.Nullable;
import javax.crypto.SecretKey;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.google.protobuf.ByteString;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
import org.apache.tez.util.FastNumberFormat;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.token.Token;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DetailedPartitionStatsProto;
public class ShuffleUtils {
private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class);
private static final long MB = 1024l * 1024l;
static final ThreadLocal<DecimalFormat> MBPS_FORMAT =
new ThreadLocal<DecimalFormat>() {
@Override
protected DecimalFormat initialValue() {
return new DecimalFormat("0.00");
}
};
static final ThreadLocal<FastNumberFormat> MBPS_FAST_FORMAT =
new ThreadLocal<FastNumberFormat>() {
@Override
protected FastNumberFormat initialValue() {
FastNumberFormat fmt = FastNumberFormat.getInstance();
fmt.setMinimumIntegerDigits(2);
return fmt;
}
};
public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
throws IOException {
DataInputByteBuffer in = new DataInputByteBuffer();
in.reset(meta);
Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
jt.readFields(in);
SecretKey sk = JobTokenSecretManager.createSecretKey(jt.getPassword());
return sk;
}
public static ByteBuffer convertJobTokenToBytes(
Token<JobTokenIdentifier> jobToken) throws IOException {
return TezCommonUtils.convertJobTokenToBytes(jobToken);
}
public static int deserializeShuffleProviderMetaData(ByteBuffer meta)
throws IOException {
return TezRuntimeUtils.deserializeShuffleProviderMetaData(meta);
}
public static void shuffleToMemory(byte[] shuffleData,
InputStream input, int decompressedLength, int compressedLength,
CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength,
Logger LOG, InputAttemptIdentifier identifier) throws IOException {
try {
IFile.Reader.readToMemory(shuffleData, input, compressedLength, codec,
ifileReadAhead, ifileReadAheadLength);
// metrics.inputBytes(shuffleData.length);
if (LOG.isDebugEnabled()) {
LOG.debug("Read " + shuffleData.length + " bytes from input for "
+ identifier);
}
} catch (InternalError | Exception e) {
// Close the streams
LOG.info("Failed to read data to memory for " + identifier + ". len=" + compressedLength +
", decomp=" + decompressedLength + ". ExceptionMessage=" + e.getMessage());
ioCleanup(input);
if (e instanceof InternalError) {
// The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
// on decompression failures. Catching and re-throwing as IOException
// to allow fetch failure logic to be processed.
throw new IOException(e);
} else if (e instanceof IOException) {
throw e;
} else {
// Re-throw as an IOException
throw new IOException(e);
}
}
}
public static void shuffleToDisk(OutputStream output, String hostIdentifier,
InputStream input, long compressedLength, long decompressedLength, Logger LOG, InputAttemptIdentifier identifier,
boolean ifileReadAhead, int ifileReadAheadLength, boolean verifyChecksum) throws IOException {
// Copy data to local-disk
long bytesLeft = compressedLength;
try {
if (verifyChecksum) {
bytesLeft -= IFile.Reader.readToDisk(output, input, compressedLength,
ifileReadAhead, ifileReadAheadLength);
} else {
final int BYTES_TO_READ = 64 * 1024;
byte[] buf = new byte[BYTES_TO_READ];
while (bytesLeft > 0) {
int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
if (n < 0) {
throw new IOException("read past end of stream reading "
+ identifier);
}
output.write(buf, 0, n);
bytesLeft -= n;
// metrics.inputBytes(n);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Read " + (compressedLength - bytesLeft)
+ " bytes from input for " + identifier);
}
output.close();
} catch (IOException ioe) {
// Close the streams
LOG.info("Failed to read data to disk for " + identifier + ". len=" + compressedLength +
", decomp=" + decompressedLength + ". ExceptionMessage=" + ioe.getMessage());
ioCleanup(input, output);
// Re-throw
throw ioe;
}
// Sanity check
if (bytesLeft != 0) {
throw new IOException("Incomplete map output received for " +
identifier + " from " +
hostIdentifier + " (" +
bytesLeft + " bytes missing of " +
compressedLength + ")");
}
}
public static void ioCleanup(Closeable... closeables) {
for (Closeable c : closeables) {
if (c == null)
continue;
try {
c.close();
} catch (IOException e) {
if (LOG.isDebugEnabled())
LOG.debug("Exception in closing " + c, e);
}
}
}
public static StringBuilder constructBaseURIForShuffleHandler(String host,
int port, int partition, int partitionCount, String appId, int dagIdentifier, boolean sslShuffle) {
final String http_protocol = (sslShuffle) ? "https://" : "http://";
StringBuilder sb = new StringBuilder(http_protocol);
sb.append(host);
sb.append(":");
sb.append(port);
sb.append("/");
sb.append("mapOutput?job=");
sb.append(appId.replace("application", "job"));
sb.append("&dag=");
sb.append(String.valueOf(dagIdentifier));
sb.append("&reduce=");
sb.append(String.valueOf(partition));
if (partitionCount > 1) {
sb.append("-");
sb.append(String.valueOf(partition + partitionCount - 1));
}
sb.append("&map=");
return sb;
}
public static URL constructInputURL(String baseURI,
Collection<InputAttemptIdentifier> inputs, boolean keepAlive) throws MalformedURLException {
StringBuilder url = new StringBuilder(baseURI);
boolean first = true;
for (InputAttemptIdentifier input : inputs) {
if (first) {
first = false;
url.append(input.getPathComponent());
} else {
url.append(",").append(input.getPathComponent());
}
}
//It is possible to override keep-alive setting in cluster by adding keepAlive in url.
//Refer MAPREDUCE-5787 to enable/disable keep-alive in the cluster.
if (keepAlive) {
url.append("&keepAlive=true");
}
return new URL(url.toString());
}
public static BaseHttpConnection getHttpConnection(boolean asyncHttp, URL url,
HttpConnectionParams params, String logIdentifier, JobTokenSecretManager jobTokenSecretManager)
throws IOException {
return TezRuntimeUtils.getHttpConnection(asyncHttp, url, params, logIdentifier, jobTokenSecretManager);
}
public static String stringify(DataMovementEventPayloadProto dmProto) {
StringBuilder sb = new StringBuilder();
sb.append("[");
if (dmProto.hasEmptyPartitions()) {
sb.append("hasEmptyPartitions: ").append(dmProto.hasEmptyPartitions()).append(", ");
}
sb.append("host: " + dmProto.getHost()).append(", ");
sb.append("port: " + dmProto.getPort()).append(", ");
sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");
sb.append("runDuration: " + dmProto.getRunDuration()).append(", ");
sb.append("hasDataInEvent: " + dmProto.hasData());
sb.append("]");
return sb.toString();
}
/**
* Generate DataMovementEvent
*
* @param sendEmptyPartitionDetails
* @param numPhysicalOutputs
* @param spillRecord
* @param context
* @param spillId
* @param finalMergeEnabled
* @param isLastEvent
* @param pathComponent
* @param auxiliaryService
* @param deflater
* @return ByteBuffer
* @throws IOException
*/
static ByteBuffer generateDMEPayload(boolean sendEmptyPartitionDetails,
int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext context,
int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent, String auxiliaryService, Deflater deflater)
throws IOException {
DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
.newBuilder();
boolean outputGenerated = true;
if (sendEmptyPartitionDetails) {
BitSet emptyPartitionDetails = new BitSet();
for(int i=0;i<spillRecord.size();i++) {
TezIndexRecord indexRecord = spillRecord.getIndex(i);
if (!indexRecord.hasData()) {
emptyPartitionDetails.set(i);
}
}
int emptyPartitions = emptyPartitionDetails.cardinality();
outputGenerated = (spillRecord.size() != emptyPartitions);
if (emptyPartitions > 0) {
ByteString emptyPartitionsBytesString =
TezCommonUtils.compressByteArrayToByteString(
TezUtilsInternal.toByteArray(emptyPartitionDetails), deflater);
payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
LOG.info("EmptyPartition bitsetSize=" + emptyPartitionDetails.cardinality() + ", numOutputs="
+ numPhysicalOutputs + ", emptyPartitions=" + emptyPartitions
+ ", compressedSize=" + emptyPartitionsBytesString.size());
}
}
if (!sendEmptyPartitionDetails || outputGenerated) {
String host = context.getExecutionContext().getHostName();
ByteBuffer shuffleMetadata = context
.getServiceProviderMetaData(auxiliaryService);
int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
payloadBuilder.setHost(host);
payloadBuilder.setPort(shufflePort);
//Path component is always 0 indexed
payloadBuilder.setPathComponent(pathComponent);
}
if (!finalMergeEnabled) {
payloadBuilder.setSpillId(spillId);
payloadBuilder.setLastEvent(isLastEvent);
}
payloadBuilder.setRunDuration(0); //TODO: who is dependent on this?
DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
ByteBuffer payload = payloadProto.toByteString().asReadOnlyByteBuffer();
return payload;
}
/**
* Generate events for outputs which have not been started.
* @param eventList
* @param numPhysicalOutputs
* @param context
* @param generateVmEvent whether to generate a vm event or not
* @param isCompositeEvent whether to generate a CompositeDataMovementEvent or a DataMovementEvent
* @param deflater
* @throws IOException
*/
public static void generateEventsForNonStartedOutput(List<Event> eventList,
int numPhysicalOutputs,
OutputContext context,
boolean generateVmEvent,
boolean isCompositeEvent, Deflater deflater) throws
IOException {
DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
.newBuilder();
// Construct the VertexManager event if required.
if (generateVmEvent) {
ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
vmBuilder.setOutputSize(0);
VertexManagerEvent vmEvent = VertexManagerEvent.create(
context.getDestinationVertexName(),
vmBuilder.build().toByteString().asReadOnlyByteBuffer());
eventList.add(vmEvent);
}
// Construct the DataMovementEvent
// Always set empty partition information since no files were generated.
LOG.info("Setting all {} partitions as empty for non-started output", numPhysicalOutputs);
BitSet emptyPartitionDetails = new BitSet(numPhysicalOutputs);
emptyPartitionDetails.set(0, numPhysicalOutputs, true);
ByteString emptyPartitionsBytesString =
TezCommonUtils.compressByteArrayToByteString(
TezUtilsInternal.toByteArray(emptyPartitionDetails), deflater);
payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
payloadBuilder.setRunDuration(0);
DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
ByteBuffer dmePayload = payloadProto.toByteString().asReadOnlyByteBuffer();
if (isCompositeEvent) {
CompositeDataMovementEvent cdme =
CompositeDataMovementEvent.create(0, numPhysicalOutputs, dmePayload);
eventList.add(cdme);
} else {
DataMovementEvent dme = DataMovementEvent.create(0, dmePayload);
eventList.add(dme);
}
}
/**
* Generate events when spill happens
*
* @param eventList events would be added to this list
* @param finalMergeEnabled
* @param isLastEvent
* @param context
* @param spillId
* @param spillRecord
* @param numPhysicalOutputs
* @param pathComponent
* @param partitionStats
* @param auxiliaryService
* @throws IOException
*/
public static void generateEventOnSpill(List<Event> eventList, boolean finalMergeEnabled,
boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord,
int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent,
@Nullable long[] partitionStats, boolean reportDetailedPartitionStats, String auxiliaryService, Deflater deflater)
throws IOException {
Preconditions.checkArgument(eventList != null, "EventList can't be null");
context.notifyProgress();
if (finalMergeEnabled) {
Preconditions.checkArgument(isLastEvent, "Can not send multiple events when final merge is "
+ "enabled");
}
if (LOG.isDebugEnabled()) {
LOG.debug("pathComponent=" + pathComponent + ", isLastEvent="
+ isLastEvent + ", spillId=" + spillId + ", finalMergeDisabled=" + finalMergeEnabled +
", numPhysicalOutputs=" + numPhysicalOutputs);
}
ByteBuffer payload = generateDMEPayload(sendEmptyPartitionDetails, numPhysicalOutputs,
spillRecord, context, spillId,
finalMergeEnabled, isLastEvent, pathComponent, auxiliaryService, deflater);
if (finalMergeEnabled || isLastEvent) {
VertexManagerEvent vmEvent = generateVMEvent(context, partitionStats,
reportDetailedPartitionStats, deflater);
eventList.add(vmEvent);
}
CompositeDataMovementEvent csdme =
CompositeDataMovementEvent.create(0, numPhysicalOutputs, payload);
eventList.add(csdme);
}
public static VertexManagerEvent generateVMEvent(OutputContext context,
long[] sizePerPartition, boolean reportDetailedPartitionStats, Deflater deflater)
throws IOException {
ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
long outputSize = context.getCounters().
findCounter(TaskCounter.OUTPUT_BYTES).getValue();
// Set this information only when required. In pipelined shuffle,
// multiple events would end up adding up to final output size.
// This is needed for auto-reduce parallelism to work properly.
vmBuilder.setOutputSize(outputSize);
vmBuilder.setNumRecord(context.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS).getValue()
+ context.getCounters().findCounter(TaskCounter.OUTPUT_LARGE_RECORDS).getValue());
//set partition stats
if (sizePerPartition != null && sizePerPartition.length > 0) {
if (reportDetailedPartitionStats) {
vmBuilder.setDetailedPartitionStats(
getDetailedPartitionStatsForPhysicalOutput(sizePerPartition));
} else {
RoaringBitmap stats = getPartitionStatsForPhysicalOutput(
sizePerPartition);
DataOutputBuffer dout = new DataOutputBuffer();
stats.serialize(dout);
ByteString partitionStatsBytes =
TezCommonUtils.compressByteArrayToByteString(dout.getData(), deflater);
vmBuilder.setPartitionStats(partitionStatsBytes);
}
}
VertexManagerEvent vmEvent = VertexManagerEvent.create(
context.getDestinationVertexName(),
vmBuilder.build().toByteString().asReadOnlyByteBuffer());
return vmEvent;
}
/**
* Data size for the destinations
*
* @param sizes for physical outputs
*/
public static RoaringBitmap getPartitionStatsForPhysicalOutput(long[] sizes) {
RoaringBitmap partitionStats = new RoaringBitmap();
if (sizes == null || sizes.length == 0) {
return partitionStats;
}
final int RANGE_LEN = DATA_RANGE_IN_MB.values().length;
for (int i = 0; i < sizes.length; i++) {
int bucket = DATA_RANGE_IN_MB.getRange(sizes[i]).ordinal();
int index = i * (RANGE_LEN);
partitionStats.add(index + bucket);
}
return partitionStats;
}
static long ceil(long a, long b) {
return (a + (b - 1)) / b;
}
/**
* Detailed partition stats
*
* @param sizes actual partition sizes
*/
public static DetailedPartitionStatsProto
getDetailedPartitionStatsForPhysicalOutput(long[] sizes) {
DetailedPartitionStatsProto.Builder builder =
DetailedPartitionStatsProto.newBuilder();
for (int i=0; i<sizes.length; i++) {
// Round the size up. So 1 byte -> the value of sizeInMB == 1
// Throws IllegalArgumentException if value is greater than
// Integer.MAX_VALUE. That should be ok given Integer.MAX_VALUE * MB
// means PB.
int sizeInMb = Ints.checkedCast(ceil(sizes[i], MB));
builder.addSizeInMb(sizeInMb);
}
return builder.build();
}
public static class FetchStatsLogger {
private final Logger activeLogger;
private final Logger aggregateLogger;
private final AtomicLong logCount = new AtomicLong();
private final AtomicLong compressedSize = new AtomicLong();
private final AtomicLong decompressedSize = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
public FetchStatsLogger(Logger activeLogger, Logger aggregateLogger) {
this.activeLogger = activeLogger;
this.aggregateLogger = aggregateLogger;
}
private static StringBuilder toShortString(InputAttemptIdentifier inputAttemptIdentifier, StringBuilder sb) {
sb.append("{");
sb.append(inputAttemptIdentifier.getInputIdentifier());
sb.append(", ").append(inputAttemptIdentifier.getAttemptNumber());
sb.append(", ").append(inputAttemptIdentifier.getPathComponent());
if (inputAttemptIdentifier.getFetchTypeInfo()
!= InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED) {
sb.append(", ").append(inputAttemptIdentifier.getFetchTypeInfo().ordinal());
sb.append(", ").append(inputAttemptIdentifier.getSpillEventId());
}
sb.append("}");
return sb;
}
/**
* Log individual fetch complete event.
* This log information would be used by tez-tool/perf-analzyer/shuffle tools for mining
* - amount of data transferred between source to destination machine
* - time taken to transfer data between source to destination machine
* - details on DISK/DISK_DIRECT/MEMORY based shuffles
*
* @param millis
* @param bytesCompressed
* @param bytesDecompressed
* @param outputType
* @param srcAttemptIdentifier
*/
public void logIndividualFetchComplete(long millis, long bytesCompressed,
long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) {
if (activeLogger.isInfoEnabled()) {
long wholeMBs = 0;
long partialMBs = 0;
if (millis != 0) {
// fast math is done using integer math to avoid double to string conversion
// calculate B/s * 100 to preserve MBs precision to two decimal places
// multiply numerator by 100000 (2^5 * 5^5) and divide denominator by MB (2^20)
// simply fraction to protect ourselves from overflow by factoring out 2^5
wholeMBs = (bytesCompressed * 3125) / (millis * 32768);
partialMBs = wholeMBs % 100;
wholeMBs /= 100;
}
StringBuilder sb = new StringBuilder("Completed fetch for attempt: ");
toShortString(srcAttemptIdentifier, sb);
sb.append(" to ");
sb.append(outputType);
sb.append(", csize=");
sb.append(bytesCompressed);
sb.append(", dsize=");
sb.append(bytesDecompressed);
sb.append(", EndTime=");
sb.append(System.currentTimeMillis());
sb.append(", TimeTaken=");
sb.append(millis);
sb.append(", Rate=");
sb.append(wholeMBs);
sb.append(".");
MBPS_FAST_FORMAT.get().format(partialMBs, sb);
sb.append(" MB/s");
activeLogger.info(sb.toString());
} else {
long currentCount, currentCompressedSize, currentDecompressedSize, currentTotalTime;
synchronized (this) {
currentCount = logCount.incrementAndGet();
currentCompressedSize = compressedSize.addAndGet(bytesCompressed);
currentDecompressedSize = decompressedSize.addAndGet(bytesDecompressed);
currentTotalTime = totalTime.addAndGet(millis);
if (currentCount % 1000 == 0) {
compressedSize.set(0);
decompressedSize.set(0);
totalTime.set(0);
}
}
if (currentCount % 1000 == 0) {
double avgRate = currentTotalTime == 0 ? 0
: currentCompressedSize / (double)currentTotalTime / 1000 / 1024 / 1024;
aggregateLogger.info("Completed {} fetches, stats for last 1000 fetches: "
+ "avg csize: {}, avg dsize: {}, avgTime: {}, avgRate: {}", currentCount,
currentCompressedSize / 1000, currentDecompressedSize / 1000, currentTotalTime / 1000,
MBPS_FORMAT.get().format(avgRate));
}
}
}
}
/**
* Build {@link org.apache.tez.http.HttpConnectionParams} from configuration
*
* @param conf
* @return HttpConnectionParams
*/
public static HttpConnectionParams getHttpConnectionParams(Configuration conf) {
return TezRuntimeUtils.getHttpConnectionParams(conf);
}
public static boolean isTezShuffleHandler(Configuration config) {
return config.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT).
contains("tez");
}
}