blob: 5d7aea3724b3a6c742cd76b7405405f0be142976 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.tez.common;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.util.BitSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.TextFormat;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.log4j.Appender;
import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
@Private
public class TezUtilsInternal {
private static final Logger LOG = LoggerFactory.getLogger(TezUtilsInternal.class);
public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws
IOException {
FileInputStream confPBBinaryStream = null;
ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
try {
confPBBinaryStream =
new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
confProtoBuilder.mergeFrom(confPBBinaryStream);
} finally {
if (confPBBinaryStream != null) {
confPBBinaryStream.close();
}
}
ConfigurationProto confProto = confProtoBuilder.build();
return confProto;
}
public static void addUserSpecifiedTezConfiguration(Configuration conf,
List<PlanKeyValuePair> kvPairList) {
if (kvPairList != null && !kvPairList.isEmpty()) {
for (PlanKeyValuePair kvPair : kvPairList) {
conf.set(kvPair.getKey(), kvPair.getValue());
}
}
}
//
// public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
// IOException {
// FileInputStream confPBBinaryStream = null;
// ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
// try {
// confPBBinaryStream =
// new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
// confProtoBuilder.mergeFrom(confPBBinaryStream);
// } finally {
// if (confPBBinaryStream != null) {
// confPBBinaryStream.close();
// }
// }
//
// ConfigurationProto confProto = confProtoBuilder.build();
//
// List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
// if (kvPairList != null && !kvPairList.isEmpty()) {
// for (PlanKeyValuePair kvPair : kvPairList) {
// conf.set(kvPair.getKey(), kvPair.getValue());
// }
// }
// }
public static byte[] compressBytes(byte[] inBytes) throws IOException {
StopWatch sw = new StopWatch().start();
byte[] compressed = compressBytesInflateDeflate(inBytes);
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: " + compressed.length
+ ", CompressTime: " + sw.now(TimeUnit.MILLISECONDS));
}
return compressed;
}
public static byte[] uncompressBytes(byte[] inBytes) throws IOException {
StopWatch sw = new StopWatch().start();
byte[] uncompressed = uncompressBytesInflateDeflate(inBytes);
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: " + uncompressed.length
+ ", UncompressTimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return uncompressed;
}
private static byte[] compressBytesInflateDeflate(byte[] inBytes) {
Deflater deflater = new Deflater(Deflater.BEST_SPEED);
deflater.setInput(inBytes);
NonSyncByteArrayOutputStream bos = new NonSyncByteArrayOutputStream(inBytes.length);
deflater.finish();
byte[] buffer = new byte[1024 * 8];
while (!deflater.finished()) {
int count = deflater.deflate(buffer);
bos.write(buffer, 0, count);
}
byte[] output = bos.toByteArray();
return output;
}
private static byte[] uncompressBytesInflateDeflate(byte[] inBytes) throws IOException {
Inflater inflater = new Inflater();
inflater.setInput(inBytes);
NonSyncByteArrayOutputStream bos = new NonSyncByteArrayOutputStream(inBytes.length);
byte[] buffer = new byte[1024 * 8];
while (!inflater.finished()) {
int count;
try {
count = inflater.inflate(buffer);
} catch (DataFormatException e) {
throw new IOException(e);
}
bos.write(buffer, 0, count);
}
byte[] output = bos.toByteArray();
return output;
}
private static final Pattern pattern = Pattern.compile("\\W");
@Private
public static final int MAX_VERTEX_NAME_LENGTH = 40;
@Private
public static String cleanVertexName(String vertexName) {
return sanitizeString(vertexName).substring(0,
vertexName.length() > MAX_VERTEX_NAME_LENGTH ? MAX_VERTEX_NAME_LENGTH : vertexName.length());
}
private static String sanitizeString(String srcString) {
Matcher matcher = pattern.matcher(srcString);
String res = matcher.replaceAll("_");
return res; // Number starts allowed rightnow
}
public static void updateLoggers(String addend) throws FileNotFoundException {
LOG.info("Redirecting log file based on addend: " + addend);
Appender appender = org.apache.log4j.Logger.getRootLogger().getAppender(
TezConstants.TEZ_CONTAINER_LOGGER_NAME);
if (appender != null) {
if (appender instanceof TezContainerLogAppender) {
TezContainerLogAppender claAppender = (TezContainerLogAppender) appender;
claAppender.setLogFileName(constructLogFileName(
TezConstants.TEZ_CONTAINER_LOG_FILE_NAME, addend));
claAppender.activateOptions();
} else {
LOG.warn("Appender is a " + appender.getClass() + "; require an instance of "
+ TezContainerLogAppender.class.getName() + " to reconfigure the logger output");
}
} else {
LOG.warn("Not configured with appender named: " + TezConstants.TEZ_CONTAINER_LOGGER_NAME
+ ". Cannot reconfigure logger output");
}
}
private static String constructLogFileName(String base, String addend) {
if (addend == null || addend.isEmpty()) {
return base;
} else {
return base + "_" + addend;
}
}
public static BitSet fromByteArray(byte[] bytes) {
if (bytes == null) {
return new BitSet();
}
BitSet bits = new BitSet();
for (int i = 0; i < bytes.length * 8; i++) {
if ((bytes[(bytes.length) - (i / 8) - 1] & (1 << (i % 8))) > 0) {
bits.set(i);
}
}
return bits;
}
public static byte[] toByteArray(BitSet bits) {
if (bits == null) {
return null;
}
byte[] bytes = new byte[(bits.length() + 7) / 8];
for (int i = 0; i < bits.length(); i++) {
if (bits.get(i)) {
bytes[(bytes.length) - (i / 8) - 1] |= 1 << (i % 8);
}
}
return bytes;
}
/**
* Convert DAGPlan to text. Skip sensitive informations like credentials.
*
* @param dagPlan
* @return a string representation of the dag plan with sensitive information removed
*/
public static String convertDagPlanToString(DAGProtos.DAGPlan dagPlan) throws IOException {
StringBuilder sb = new StringBuilder();
for (Map.Entry<Descriptors.FieldDescriptor, Object> entry : dagPlan.getAllFields().entrySet()) {
if (entry.getKey().getNumber() != DAGProtos.DAGPlan.CREDENTIALS_BINARY_FIELD_NUMBER) {
TextFormat.printField(entry.getKey(), entry.getValue(), sb);
} else {
Credentials credentials =
DagTypeConverters.convertByteStringToCredentials(dagPlan.getCredentialsBinary());
TextFormat.printField(entry.getKey(),
ByteString.copyFrom(TezCommonUtils.getCredentialsInfo(credentials,"dag").getBytes(
Charset.forName("UTF-8"))), sb);
}
}
return sb.toString();
}
public static TaskAttemptTerminationCause fromTaskAttemptEndReason(
TaskAttemptEndReason taskAttemptEndReason) {
if (taskAttemptEndReason == null) {
return null;
}
switch (taskAttemptEndReason) {
case COMMUNICATION_ERROR:
return TaskAttemptTerminationCause.COMMUNICATION_ERROR;
case EXECUTOR_BUSY:
return TaskAttemptTerminationCause.SERVICE_BUSY;
case INTERNAL_PREEMPTION:
return TaskAttemptTerminationCause.INTERNAL_PREEMPTION;
case EXTERNAL_PREEMPTION:
return TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
case APPLICATION_ERROR:
return TaskAttemptTerminationCause.APPLICATION_ERROR;
case FRAMEWORK_ERROR:
return TaskAttemptTerminationCause.FRAMEWORK_ERROR;
case NODE_FAILED:
return TaskAttemptTerminationCause.NODE_FAILED;
case CONTAINER_EXITED:
return TaskAttemptTerminationCause.CONTAINER_EXITED;
case OTHER:
return TaskAttemptTerminationCause.UNKNOWN_ERROR;
default:
return TaskAttemptTerminationCause.UNKNOWN_ERROR;
}
}
public static TaskAttemptEndReason toTaskAttemptEndReason(TaskAttemptTerminationCause cause) {
// TODO Post TEZ-2003. Consolidate these states, and mappings.
if (cause == null) {
return null;
}
switch (cause) {
case COMMUNICATION_ERROR:
return TaskAttemptEndReason.COMMUNICATION_ERROR;
case SERVICE_BUSY:
return TaskAttemptEndReason.EXECUTOR_BUSY;
case INTERNAL_PREEMPTION:
return TaskAttemptEndReason.INTERNAL_PREEMPTION;
case EXTERNAL_PREEMPTION:
return TaskAttemptEndReason.EXTERNAL_PREEMPTION;
case APPLICATION_ERROR:
return TaskAttemptEndReason.APPLICATION_ERROR;
case FRAMEWORK_ERROR:
return TaskAttemptEndReason.FRAMEWORK_ERROR;
case NODE_FAILED:
return TaskAttemptEndReason.NODE_FAILED;
case CONTAINER_EXITED:
return TaskAttemptEndReason.CONTAINER_EXITED;
case INTERRUPTED_BY_SYSTEM:
case INTERRUPTED_BY_USER:
case UNKNOWN_ERROR:
case TERMINATED_BY_CLIENT:
case TERMINATED_AT_SHUTDOWN:
case TERMINATED_INEFFECTIVE_SPECULATION:
case TERMINATED_EFFECTIVE_SPECULATION:
case TERMINATED_ORPHANED:
case INPUT_READ_ERROR:
case OUTPUT_WRITE_ERROR:
case OUTPUT_LOST:
case TASK_HEARTBEAT_ERROR:
case CONTAINER_LAUNCH_FAILED:
case CONTAINER_STOPPED:
case NODE_DISK_ERROR:
default:
return TaskAttemptEndReason.OTHER;
}
}
public static <T extends Enum<T>> Set<T> getEnums(Configuration conf, String confName,
Class<T> enumType, String defaultValues) {
String[] names = conf.getStrings(confName);
if (names == null) {
names = StringUtils.getStrings(defaultValues);
}
if (names == null) {
return null;
}
Set<T> enums = new HashSet<>();
for (String name : names) {
enums.add(Enum.valueOf(enumType, name));
}
return enums;
}
@Private
public static void setHadoopCallerContext(HadoopShim hadoopShim, TezTaskAttemptID attemptID) {
hadoopShim.setHadoopCallerContext("tez_ta:" + attemptID.toString());
}
@Private
public static void setHadoopCallerContext(HadoopShim hadoopShim, TezVertexID vertexID) {
hadoopShim.setHadoopCallerContext("tez_v:" + vertexID.toString());
}
@Private
public static void setHadoopCallerContext(HadoopShim hadoopShim, TezDAGID dagID) {
hadoopShim.setHadoopCallerContext("tez_dag:" + dagID.toString());
}
@Private
public static void setHadoopCallerContext(HadoopShim hadoopShim, ApplicationId appID) {
hadoopShim.setHadoopCallerContext("tez_app:" + appID.toString());
}
@Private
public static void setSecurityUtilConfigration(Logger log, Configuration conf) {
// Use reflection to invoke SecurityUtil.setConfiguration when available, version 2.6.0 of
// hadoop does not support it, it is currently available from 2.9.0.
// Remove this when the minimum supported hadoop version has the above method.
Class<SecurityUtil> clz = SecurityUtil.class;
try {
Method method = clz.getMethod("setConfiguration", Configuration.class);
method.invoke(null, conf);
} catch (NoSuchMethodException e) {
// This is not available, so ignore it.
} catch (SecurityException | IllegalAccessException | IllegalArgumentException |
InvocationTargetException e) {
log.warn("Error invoking SecurityUtil.setConfiguration: ", e);
throw new TezUncheckedException("Error invoking SecurityUtil.setConfiguration", e);
}
}
}