blob: e4c59daa63e6a00fa623240e7177bb9653809ca4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.hadoop.impl;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import com.google.common.primitives.Longs;
import com.google.common.primitives.UnsignedBytes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Hadoop utility methods.
*/
public class HadoopUtils {
/** Staging constant. */
private static final String STAGING_CONSTANT = ".staging";
/** Old mapper class attribute. */
private static final String OLD_MAP_CLASS_ATTR = "mapred.mapper.class";
/** Old reducer class attribute. */
private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
/**
* Constructor.
*/
private HadoopUtils() {
// No-op.
}
/**
* Wraps native split.
*
* @param id Split ID.
* @param split Split.
* @param hosts Hosts.
* @throws IOException If failed.
*/
public static HadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException {
ByteArrayOutputStream arr = new ByteArrayOutputStream();
ObjectOutput out = new ObjectOutputStream(arr);
assert split instanceof Writable;
((Writable)split).write(out);
out.flush();
return new HadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts);
}
/**
* Unwraps native split.
*
* @param o Wrapper.
* @return Split.
*/
public static Object unwrapSplit(HadoopSplitWrapper o) {
try {
Writable w = (Writable)HadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance();
w.readFields(new ObjectInputStream(new ByteArrayInputStream(o.bytes())));
return w;
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
/**
* Convert Ignite job status to Hadoop job status.
*
* @param status Ignite job status.
* @return Hadoop job status.
*/
public static JobStatus status(HadoopJobStatus status, Configuration conf) {
JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId());
float setupProgress = 0;
float mapProgress = 0;
float reduceProgress = 0;
float cleanupProgress = 0;
JobStatus.State state = JobStatus.State.RUNNING;
switch (status.jobPhase()) {
case PHASE_SETUP:
setupProgress = 0.42f;
break;
case PHASE_MAP:
setupProgress = 1;
mapProgress = 1f - status.pendingMapperCnt() / (float)status.totalMapperCnt();
break;
case PHASE_REDUCE:
setupProgress = 1;
mapProgress = 1;
if (status.totalReducerCnt() > 0)
reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
else
reduceProgress = 1f;
break;
case PHASE_CANCELLING:
case PHASE_COMPLETE:
if (!status.isFailed()) {
setupProgress = 1;
mapProgress = 1;
reduceProgress = 1;
cleanupProgress = 1;
state = JobStatus.State.SUCCEEDED;
}
else
state = JobStatus.State.FAILED;
break;
default:
assert false;
}
return new JobStatus(jobId, setupProgress, mapProgress, reduceProgress, cleanupProgress, state,
JobPriority.NORMAL, status.user(), status.jobName(), jobFile(conf, status.user(), jobId).toString(), "N/A");
}
/**
* Gets staging area directory.
*
* @param conf Configuration.
* @param usr User.
* @return Staging area directory.
*/
public static Path stagingAreaDir(Configuration conf, String usr) {
return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
+ Path.SEPARATOR + usr + Path.SEPARATOR + STAGING_CONSTANT);
}
/**
* Gets job file.
*
* @param conf Configuration.
* @param usr User.
* @param jobId Job ID.
* @return Job file.
*/
public static Path jobFile(Configuration conf, String usr, JobID jobId) {
return new Path(stagingAreaDir(conf, usr), jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE);
}
/**
* Checks the attribute in configuration is not set.
*
* @param attr Attribute name.
* @param msg Message for creation of exception.
* @throws IgniteCheckedException If attribute is set.
*/
public static void ensureNotSet(Configuration cfg, String attr, String msg) throws IgniteCheckedException {
if (cfg.get(attr) != null)
throw new IgniteCheckedException(attr + " is incompatible with " + msg + " mode.");
}
/**
* Creates JobInfo from hadoop configuration.
*
* @param cfg Hadoop configuration.
* @param credentials Credentials.
* @return Job info.
* @throws IgniteCheckedException If failed.
*/
public static HadoopDefaultJobInfo createJobInfo(Configuration cfg, byte[] credentials)
throws IgniteCheckedException {
JobConf jobConf = new JobConf(cfg);
boolean hasCombiner = jobConf.get("mapred.combiner.class") != null
|| jobConf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null;
int numReduces = jobConf.getNumReduceTasks();
jobConf.setBooleanIfUnset("mapred.mapper.new-api", jobConf.get(OLD_MAP_CLASS_ATTR) == null);
if (jobConf.getUseNewMapper()) {
String mode = "new map API";
ensureNotSet(jobConf, "mapred.input.format.class", mode);
ensureNotSet(jobConf, OLD_MAP_CLASS_ATTR, mode);
if (numReduces != 0)
ensureNotSet(jobConf, "mapred.partitioner.class", mode);
else
ensureNotSet(jobConf, "mapred.output.format.class", mode);
}
else {
String mode = "map compatibility";
ensureNotSet(jobConf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode);
ensureNotSet(jobConf, MRJobConfig.MAP_CLASS_ATTR, mode);
if (numReduces != 0)
ensureNotSet(jobConf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode);
else
ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
}
if (numReduces != 0) {
jobConf.setBooleanIfUnset("mapred.reducer.new-api", jobConf.get(OLD_REDUCE_CLASS_ATTR) == null);
if (jobConf.getUseNewReducer()) {
String mode = "new reduce API";
ensureNotSet(jobConf, "mapred.output.format.class", mode);
ensureNotSet(jobConf, OLD_REDUCE_CLASS_ATTR, mode);
}
else {
String mode = "reduce compatibility";
ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
ensureNotSet(jobConf, MRJobConfig.REDUCE_CLASS_ATTR, mode);
}
}
Map<String, String> props = new HashMap<>();
for (Map.Entry<String, String> entry : jobConf)
props.put(entry.getKey(), entry.getValue());
return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props,
credentials);
}
/**
* Throws new {@link IgniteCheckedException} with original exception is serialized into string.
* This is needed to transfer error outside the current class loader.
*
* @param e Original exception.
* @return IgniteCheckedException New exception.
*/
public static IgniteCheckedException transformException(Throwable e) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
e.printStackTrace(new PrintStream(os, true));
return new IgniteCheckedException(os.toString());
}
/**
* Returns work directory for job execution.
*
* @param workDir Work directory.
* @param locNodeId Local node ID.
* @param jobId Job ID.
* @return Working directory for job.
* @throws IgniteCheckedException If Failed.
*/
public static File jobLocalDir(String workDir, UUID locNodeId, HadoopJobId jobId) throws IgniteCheckedException {
return new File(new File(U.resolveWorkDirectory(workDir, "hadoop", false),
"node-" + locNodeId), "job_" + jobId);
}
/**
* Returns subdirectory of job working directory for task execution.
*
* @param workDir Work directory.
* @param locNodeId Local node ID.
* @param info Task info.
* @return Working directory for task.
* @throws IgniteCheckedException If Failed.
*/
public static File taskLocalDir(String workDir, UUID locNodeId, HadoopTaskInfo info) throws IgniteCheckedException {
File jobLocDir = jobLocalDir(workDir, locNodeId, info.jobId());
return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt());
}
/**
* Creates {@link Configuration} in a correct class loader context to avoid caching
* of inappropriate class loader in the Configuration object.
* @return New instance of {@link Configuration}.
*/
public static Configuration safeCreateConfiguration() {
final ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(Configuration.class.getClassLoader());
try {
return new Configuration();
}
finally {
HadoopCommonUtils.restoreContextClassLoader(oldLdr);
}
}
/**
* Internal comparison routine.
*
* @param buf1 Bytes 1.
* @param len1 Length 1.
* @param ptr2 Pointer 2.
* @param len2 Length 2.
* @return Result.
*/
@SuppressWarnings("SuspiciousNameCombination")
public static int compareBytes(byte[] buf1, int len1, long ptr2, int len2) {
int minLength = Math.min(len1, len2);
int minWords = minLength / Longs.BYTES;
for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
long lw = GridUnsafe.getLong(buf1, GridUnsafe.BYTE_ARR_OFF + i);
long rw = GridUnsafe.getLong(ptr2 + i);
long diff = lw ^ rw;
if (diff != 0) {
if (GridUnsafe.BIG_ENDIAN)
return (lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE) ? -1 : 1;
// Use binary search
int n = 0;
int y;
int x = (int) diff;
if (x == 0) {
x = (int) (diff >>> 32);
n = 32;
}
y = x << 16;
if (y == 0)
n += 16;
else
x = y;
y = x << 8;
if (y == 0)
n += 8;
return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
}
}
// The epilogue to cover the last (minLength % 8) elements.
for (int i = minWords * Longs.BYTES; i < minLength; i++) {
int res = UnsignedBytes.compare(buf1[i], GridUnsafe.getByte(ptr2 + i));
if (res != 0)
return res;
}
return len1 - len2;
}
/**
* Deserialization of Hadoop Writable object.
*
* @param writable Writable object to deserialize to.
* @param bytes byte array to deserialize.
*/
public static void deserialize(Writable writable, byte[] bytes) throws IOException {
DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(bytes));
writable.readFields(dataIn);
dataIn.close();
}
/**
* Create UserGroupInformation for specified user and credentials.
*
* @param user User.
* @param credentialsBytes Credentials byte array.
*/
public static UserGroupInformation createUGI(String user, byte[] credentialsBytes) throws IOException {
Credentials credentials = new Credentials();
HadoopUtils.deserialize(credentials, credentialsBytes);
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
ugi.addCredentials(credentials);
if (credentials.numberOfTokens() > 0)
ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.TOKEN);
return ugi;
}
}