blob: 33d6e78ed93c375c4b89412ceec08ff7bedf800f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.hadoop.impl.v2;
import java.io.DataInput;
import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.WritableSerialization;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContextImpl;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.hadoop.HadoopInputSplit;
import org.apache.ignite.hadoop.io.BytesWritablePartiallyRawComparator;
import org.apache.ignite.hadoop.io.PartiallyRawComparator;
import org.apache.ignite.hadoop.io.TextPartiallyRawComparator;
import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
import org.apache.ignite.internal.processors.hadoop.HadoopTask;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1CleanupTask;
import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1MapTask;
import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1Partitioner;
import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1ReduceTask;
import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1SetupTask;
import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.jobLocalDir;
import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.taskLocalDir;
import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.transformException;
import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.unwrapSplit;
import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.FsCacheKey;
import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap;
import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching;
import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
/**
* Context for task execution.
*/
public class HadoopV2TaskContext extends HadoopTaskContext {
/** */
private static final boolean COMBINE_KEY_GROUPING_SUPPORTED;
/** Lazy per-user file system cache used by the Hadoop task. */
private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap
= createHadoopLazyConcurrentMap();
/** Default partial comparator mappings. */
private static final Map<String, String> PARTIAL_COMPARATORS = new HashMap<>();
/**
* This method is called with reflection upon Job finish with class loader of each task.
* This will clean up all the Fs created for specific task.
* Each class loader sees uses its own instance of <code>fsMap<code/> since the class loaders
* are different.
*
* @throws IgniteCheckedException On error.
*/
public static void close() throws IgniteCheckedException {
fsMap.close();
}
/** Flag is set if new context-object code is used for running the mapper. */
private final boolean useNewMapper;
/** Flag is set if new context-object code is used for running the reducer. */
private final boolean useNewReducer;
/** Flag is set if new context-object code is used for running the combiner. */
private final boolean useNewCombiner;
/** */
private final JobContextImpl jobCtx;
/** Set if task is to cancelling. */
private volatile boolean cancelled;
/** Current task. */
private volatile HadoopTask task;
/** Local node ID */
private final UUID locNodeId;
/** Counters for task. */
private final HadoopCounters cntrs = new HadoopCountersImpl();
static {
boolean ok;
try {
JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator");
ok = true;
}
catch (NoSuchMethodException ignore) {
ok = false;
}
COMBINE_KEY_GROUPING_SUPPORTED = ok;
PARTIAL_COMPARATORS.put(ByteWritable.class.getName(), BytesWritablePartiallyRawComparator.class.getName());
PARTIAL_COMPARATORS.put(Text.class.getName(), TextPartiallyRawComparator.class.getName());
}
/**
* @param taskInfo Task info.
* @param job Job.
* @param jobId Job ID.
* @param locNodeId Local node ID.
* @param jobConfDataInput DataInput for read JobConf.
*/
public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJobEx job, HadoopJobId jobId,
@Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException {
super(taskInfo, job);
this.locNodeId = locNodeId;
// Before create JobConf instance we should set new context class loader.
ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
try {
JobConf jobConf = new JobConf();
try {
jobConf.readFields(jobConfDataInput);
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
// For map-reduce jobs prefer local writes.
jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true);
initializePartiallyRawComparator(jobConf);
jobCtx = new JobContextImpl(jobConf, new JobID(jobId.globalId().toString(), jobId.localId()));
useNewMapper = jobConf.getUseNewMapper();
useNewReducer = jobConf.getUseNewReducer();
useNewCombiner = jobConf.getCombinerClass() == null;
}
finally {
HadoopCommonUtils.restoreContextClassLoader(oldLdr);
}
}
/** {@inheritDoc} */
@Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) {
return cntrs.counter(grp, name, cls);
}
/** {@inheritDoc} */
@Override public HadoopCounters counters() {
return cntrs;
}
/**
* Creates appropriate task from current task info.
*
* @return Task.
*/
private HadoopTask createTask() {
boolean isAbort = taskInfo().type() == HadoopTaskType.ABORT;
switch (taskInfo().type()) {
case SETUP:
return useNewMapper ? new HadoopV2SetupTask(taskInfo()) : new HadoopV1SetupTask(taskInfo());
case MAP:
return useNewMapper ? new HadoopV2MapTask(taskInfo()) : new HadoopV1MapTask(taskInfo());
case REDUCE:
return useNewReducer ? new HadoopV2ReduceTask(taskInfo(), true) :
new HadoopV1ReduceTask(taskInfo(), true);
case COMBINE:
return useNewCombiner ? new HadoopV2ReduceTask(taskInfo(), false) :
new HadoopV1ReduceTask(taskInfo(), false);
case COMMIT:
case ABORT:
return useNewReducer ? new HadoopV2CleanupTask(taskInfo(), isAbort) :
new HadoopV1CleanupTask(taskInfo(), isAbort);
default:
return null;
}
}
/** {@inheritDoc} */
@Override public void run() throws IgniteCheckedException {
ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(jobConf().getClassLoader());
try {
try {
task = createTask();
}
catch (Throwable e) {
if (e instanceof Error)
throw e;
throw transformException(e);
}
if (cancelled)
throw new HadoopTaskCancelledException("Task cancelled.");
try {
task.run(this);
}
catch (Throwable e) {
if (e instanceof Error)
throw e;
throw transformException(e);
}
}
finally {
task = null;
HadoopCommonUtils.restoreContextClassLoader(oldLdr);
}
}
/** {@inheritDoc} */
@Override public void cancel() {
cancelled = true;
HadoopTask t = task;
if (t != null)
t.cancel();
}
/** {@inheritDoc} */
@Override public void prepareTaskEnvironment() throws IgniteCheckedException {
File locDir;
switch (taskInfo().type()) {
case MAP:
case REDUCE:
job().prepareTaskEnvironment(taskInfo());
locDir = taskLocalDir(job.igniteWorkDirectory(), locNodeId, taskInfo());
break;
default:
locDir = jobLocalDir(job.igniteWorkDirectory(), locNodeId, taskInfo().jobId());
}
ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(jobConf().getClassLoader());
try {
FileSystem.get(jobConf());
LocalFileSystem locFs = FileSystem.getLocal(jobConf());
locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath()));
}
catch (Throwable e) {
if (e instanceof Error)
throw (Error)e;
throw transformException(e);
}
finally {
HadoopCommonUtils.restoreContextClassLoader(oldLdr);
}
}
/** {@inheritDoc} */
@Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
job().cleanupTaskEnvironment(taskInfo());
}
/**
* Creates Hadoop attempt ID.
*
* @return Attempt ID.
*/
public TaskAttemptID attemptId() {
TaskID tid = new TaskID(jobCtx.getJobID(), taskType(taskInfo().type()), taskInfo().taskNumber());
return new TaskAttemptID(tid, taskInfo().attempt());
}
/**
* @param type Task type.
* @return Hadoop task type.
*/
private TaskType taskType(HadoopTaskType type) {
switch (type) {
case SETUP:
return TaskType.JOB_SETUP;
case MAP:
case COMBINE:
return TaskType.MAP;
case REDUCE:
return TaskType.REDUCE;
case COMMIT:
case ABORT:
return TaskType.JOB_CLEANUP;
default:
return null;
}
}
/**
* Gets job configuration of the task.
*
* @return Job configuration.
*/
public JobConf jobConf() {
return jobCtx.getJobConf();
}
/**
* Gets job context of the task.
*
* @return Job context.
*/
public JobContextImpl jobContext() {
return jobCtx;
}
/** {@inheritDoc} */
@Override public HadoopPartitioner partitioner() throws IgniteCheckedException {
Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", null);
if (partClsOld != null)
return new HadoopV1Partitioner(jobConf().getPartitionerClass(), jobConf());
try {
return new HadoopV2Partitioner(jobCtx.getPartitionerClass(), jobConf());
}
catch (ClassNotFoundException e) {
throw new IgniteCheckedException(e);
}
}
/**
* Gets serializer for specified class.
*
* @param cls Class.
* @param jobConf Job configuration.
* @return Appropriate serializer.
*/
@SuppressWarnings("unchecked")
private HadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException {
A.notNull(cls, "cls");
SerializationFactory factory = new SerializationFactory(jobConf);
Serialization<?> serialization = factory.getSerialization(cls);
if (serialization == null)
throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName());
if (serialization.getClass() == WritableSerialization.class)
return new HadoopWritableSerialization((Class<? extends Writable>)cls);
return new HadoopSerializationWrapper(serialization, cls);
}
/** {@inheritDoc} */
@Override public HadoopSerialization keySerialization() throws IgniteCheckedException {
return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf());
}
/** {@inheritDoc} */
@Override public HadoopSerialization valueSerialization() throws IgniteCheckedException {
return getSerialization(jobCtx.getMapOutputValueClass(), jobConf());
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public Comparator<Object> sortComparator() {
return (Comparator<Object>)jobCtx.getSortComparator();
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public PartiallyOffheapRawComparatorEx<Object> partialRawSortComparator() {
Class cls = jobCtx.getJobConf().getClass(HadoopJobProperty.JOB_PARTIALLY_RAW_COMPARATOR.propertyName(), null);
if (cls == null)
return null;
Object res = ReflectionUtils.newInstance(cls, jobConf());
if (res instanceof PartiallyOffheapRawComparatorEx)
return (PartiallyOffheapRawComparatorEx)res;
else
return new HadoopV2DelegatingPartiallyOffheapRawComparator<>((PartiallyRawComparator)res);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public Comparator<Object> groupComparator() {
Comparator<?> res;
switch (taskInfo().type()) {
case COMBINE:
res = COMBINE_KEY_GROUPING_SUPPORTED ?
jobContext().getCombinerKeyGroupingComparator() : jobContext().getGroupingComparator();
break;
case REDUCE:
res = jobContext().getGroupingComparator();
break;
default:
return null;
}
if (res != null && res.getClass() != sortComparator().getClass())
return (Comparator<Object>)res;
return null;
}
/**
* @param split Split.
* @return Native Hadoop split.
* @throws IgniteCheckedException if failed.
*/
public Object getNativeSplit(HadoopInputSplit split) throws IgniteCheckedException {
if (split instanceof HadoopExternalSplit)
return readExternalSplit((HadoopExternalSplit)split);
if (split instanceof HadoopSplitWrapper)
return unwrapSplit((HadoopSplitWrapper)split);
throw new IllegalStateException("Unknown split: " + split);
}
/**
* @param split External split.
* @return Native input split.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
FileSystem fs;
try {
fs = fileSystemForMrUserWithCaching(jobDir.toUri(), jobConf(), fsMap);
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
try (
FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
in.seek(split.offset());
String clsName = Text.readString(in);
Class<?> cls = jobConf().getClassByName(clsName);
assert cls != null;
Serialization serialization = new SerializationFactory(jobConf()).getSerialization(cls);
Deserializer deserializer = serialization.getDeserializer(cls);
deserializer.open(in);
Object res = deserializer.deserialize(null);
deserializer.close();
assert res != null;
return res;
}
catch (IOException | ClassNotFoundException e) {
throw new IgniteCheckedException(e);
}
}
/** {@inheritDoc} */
@Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException {
if (job.info().credentials() == null) {
String user = job.info().user();
user = IgfsUtils.fixUserName(user);
assert user != null;
String ugiUser;
try {
UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
assert currUser != null;
ugiUser = currUser.getShortUserName();
}
catch (IOException ioe) {
throw new IgniteCheckedException(ioe);
}
try {
if (F.eq(user, ugiUser))
// if current UGI context user is the same, do direct call:
return c.call();
else {
UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
return ugi.doAs(new PrivilegedExceptionAction<T>() {
@Override public T run() throws Exception {
return c.call();
}
});
}
}
catch (Exception e) {
throw new IgniteCheckedException(e);
}
}
else {
try {
UserGroupInformation ugi = HadoopUtils.createUGI(job.info().user(), job.info().credentials());
return ugi.doAs(new PrivilegedExceptionAction<T>() {
@Override public T run() throws Exception {
return c.call();
}
});
}
catch (Exception e) {
throw new IgniteCheckedException(e);
}
}
}
/**
* Try initializing partially raw comparator for job.
*
* @param conf Configuration.
*/
private void initializePartiallyRawComparator(JobConf conf) {
String clsName = conf.get(HadoopJobProperty.JOB_PARTIALLY_RAW_COMPARATOR.propertyName(), null);
if (clsName == null) {
Class keyCls = conf.getMapOutputKeyClass();
while (keyCls != null) {
clsName = PARTIAL_COMPARATORS.get(keyCls.getName());
if (clsName != null) {
conf.set(HadoopJobProperty.JOB_PARTIALLY_RAW_COMPARATOR.propertyName(), clsName);
break;
}
keyCls = keyCls.getSuperclass();
}
}
}
}