| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.hadoop.impl.v2; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataInput; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.lang.reflect.Constructor; |
| import java.lang.reflect.Method; |
| import java.net.URI; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Map; |
| import java.util.Queue; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentLinkedDeque; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.ConcurrentMap; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.JobContextImpl; |
| import org.apache.hadoop.mapred.JobID; |
| import org.apache.hadoop.mapreduce.JobSubmissionFiles; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.split.JobSplit; |
| import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.hadoop.HadoopInputSplit; |
| import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; |
| import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; |
| import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo; |
| import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit; |
| import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; |
| import org.apache.ignite.internal.processors.hadoop.HadoopHelper; |
| import org.apache.ignite.internal.processors.hadoop.HadoopJobEx; |
| import org.apache.ignite.internal.processors.hadoop.HadoopJobId; |
| import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; |
| import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty; |
| import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; |
| import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; |
| import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; |
| import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; |
| import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils; |
| import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap; |
| import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1Splitter; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.T2; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.JOB_SHARED_CLASSLOADER; |
| import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.jobLocalDir; |
| import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.taskLocalDir; |
| import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.transformException; |
| import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.FsCacheKey; |
| import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap; |
| import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching; |
| |
| /** |
| * Hadoop job implementation for v2 API. |
| */ |
| public class HadoopV2Job extends HadoopJobEx { |
| /** */ |
| private final JobConf jobConf; |
| |
| /** */ |
| private final JobContextImpl jobCtx; |
| |
| /** */ |
| private final HadoopHelper helper; |
| |
| /** Hadoop job ID. */ |
| private final HadoopJobId jobId; |
| |
| /** Job info. */ |
| protected final HadoopJobInfo jobInfo; |
| |
| /** Native library names. */ |
| private final String[] libNames; |
| |
| /** */ |
| private final JobID hadoopJobID; |
| |
| /** */ |
| private final HadoopV2JobResourceManager rsrcMgr; |
| |
| /** */ |
| private final ConcurrentMap<T2<HadoopTaskType, Integer>, GridFutureAdapter<HadoopTaskContext>> ctxs = |
| new ConcurrentHashMap<>(); |
| |
| /** Pooling task context class and thus class loading environment. */ |
| private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = new ConcurrentLinkedQueue<>(); |
| |
| /** All created contexts. */ |
| private final Queue<Class<? extends HadoopTaskContext>> fullCtxClsQueue = new ConcurrentLinkedDeque<>(); |
| |
| /** File system cache map. */ |
| private final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap = createHadoopLazyConcurrentMap(); |
| |
| /** Logger. */ |
| private final IgniteLogger log; |
| |
| /** Shared class loader. */ |
| private volatile HadoopClassLoader sharedClsLdr; |
| |
| /** Local node ID */ |
| private volatile UUID locNodeId; |
| |
| /** Serialized JobConf. */ |
| private volatile byte[] jobConfData; |
| |
| /** |
| * Constructor. |
| * |
| * @param jobId Job ID. |
| * @param jobInfo Job info. |
| * @param log Logger. |
| * @param libNames Optional additional native library names. |
| * @param helper Hadoop helper. |
| */ |
| public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log, |
| @Nullable String[] libNames, HadoopHelper helper) { |
| assert jobId != null; |
| assert jobInfo != null; |
| |
| this.jobId = jobId; |
| this.jobInfo = jobInfo; |
| this.libNames = libNames; |
| this.helper = helper; |
| this.log = log; |
| |
| ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader()); |
| |
| try { |
| hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId()); |
| |
| jobConf = new JobConf(); |
| |
| HadoopFileSystemsUtils.setupFileSystems(jobConf); |
| |
| for (Map.Entry<String,String> e : jobInfo.properties().entrySet()) |
| jobConf.set(e.getKey(), e.getValue()); |
| |
| jobCtx = new JobContextImpl(jobConf, hadoopJobID); |
| |
| rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, this); |
| } |
| finally { |
| HadoopCommonUtils.restoreContextClassLoader(oldLdr); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public HadoopJobId id() { |
| return jobId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public HadoopJobInfo info() { |
| return jobInfo; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<HadoopInputSplit> input() { |
| ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(jobConf.getClassLoader()); |
| |
| try { |
| String jobDirPath = jobConf.get(MRJobConfig.MAPREDUCE_JOB_DIR); |
| |
| if (jobDirPath == null) { // Probably job was submitted not by hadoop client. |
| // Assume that we have needed classes and try to generate input splits ourself. |
| if (jobConf.getUseNewMapper()) |
| return HadoopV2Splitter.splitJob(jobCtx); |
| else |
| return HadoopV1Splitter.splitJob(jobConf); |
| } |
| |
| Path jobDir = new Path(jobDirPath); |
| |
| try { |
| FileSystem fs = fileSystem(jobDir.toUri(), jobConf); |
| |
| JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf, |
| jobDir); |
| |
| if (F.isEmpty(metaInfos)) |
| throw new IgniteCheckedException("No input splits found."); |
| |
| Path splitsFile = JobSubmissionFiles.getJobSplitFile(jobDir); |
| |
| try (FSDataInputStream in = fs.open(splitsFile)) { |
| Collection<HadoopInputSplit> res = new ArrayList<>(metaInfos.length); |
| |
| for (JobSplit.TaskSplitMetaInfo metaInfo : metaInfos) { |
| long off = metaInfo.getStartOffset(); |
| |
| String[] hosts = metaInfo.getLocations(); |
| |
| in.seek(off); |
| |
| String clsName = Text.readString(in); |
| |
| HadoopFileBlock block = HadoopV1Splitter.readFileBlock(clsName, in, hosts); |
| |
| if (block == null) |
| block = HadoopV2Splitter.readFileBlock(clsName, in, hosts); |
| |
| res.add(block != null ? block : new HadoopExternalSplit(hosts, off)); |
| } |
| |
| return res; |
| } |
| } |
| catch (Throwable e) { |
| if (e instanceof Error) |
| throw (Error)e; |
| else |
| throw transformException(e); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| finally { |
| HadoopCommonUtils.restoreContextClassLoader(oldLdr); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings({"unchecked"}) |
| @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException { |
| T2<HadoopTaskType, Integer> locTaskId = new T2<>(info.type(), info.taskNumber()); |
| |
| GridFutureAdapter<HadoopTaskContext> fut = ctxs.get(locTaskId); |
| |
| if (fut != null) |
| return fut.get(); |
| |
| GridFutureAdapter<HadoopTaskContext> old = ctxs.putIfAbsent(locTaskId, fut = new GridFutureAdapter<>()); |
| |
| if (old != null) |
| return old.get(); |
| |
| Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll(); |
| |
| try { |
| if (cls == null) { |
| // If there is no pooled class, then load new one. |
| // Note that the classloader identified by the task it was initially created for, |
| // but later it may be reused for other tasks. |
| HadoopClassLoader ldr = sharedClsLdr != null ? |
| sharedClsLdr : createClassLoader(HadoopClassLoader.nameForTask(info, false)); |
| |
| cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName()); |
| |
| fullCtxClsQueue.add(cls); |
| } |
| |
| Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, HadoopJobEx.class, |
| HadoopJobId.class, UUID.class, DataInput.class); |
| |
| if (jobConfData == null) |
| synchronized (jobConf) { |
| if (jobConfData == null) { |
| ByteArrayOutputStream buf = new ByteArrayOutputStream(); |
| |
| jobConf.write(new DataOutputStream(buf)); |
| |
| jobConfData = buf.toByteArray(); |
| } |
| } |
| |
| HadoopTaskContext res = (HadoopTaskContext)ctr.newInstance(info, this, jobId, locNodeId, |
| new DataInputStream(new ByteArrayInputStream(jobConfData))); |
| |
| fut.onDone(res); |
| |
| return res; |
| } |
| catch (Throwable e) { |
| IgniteCheckedException te = transformException(e); |
| |
| fut.onDone(te); |
| |
| if (e instanceof Error) |
| throw (Error)e; |
| |
| throw te; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void initialize(final boolean external, final UUID locNodeId) throws IgniteCheckedException { |
| assert locNodeId != null; |
| |
| this.locNodeId = locNodeId; |
| |
| ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader()); |
| |
| try { |
| if (jobInfo.credentials() == null) |
| rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, jobId)); |
| else { |
| UserGroupInformation ugi = HadoopUtils.createUGI(jobInfo.user(), jobInfo.credentials()); |
| |
| try { |
| ugi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override public Void run() throws Exception { |
| rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, |
| jobId)); |
| |
| return null; |
| } |
| }); |
| } |
| catch (IOException | InterruptedException e) { |
| throw new IgniteCheckedException(e); |
| } |
| } |
| |
| if (HadoopJobProperty.get(jobInfo, JOB_SHARED_CLASSLOADER, true)) { |
| U.warn(log, JOB_SHARED_CLASSLOADER.propertyName() + |
| " job property is set to true; please disable " + "it if job tasks rely on mutable static state."); |
| |
| sharedClsLdr = createClassLoader(HadoopClassLoader.nameForJob(jobId)); |
| } |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException(e); |
| } |
| finally { |
| HadoopCommonUtils.restoreContextClassLoader(oldLdr); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("ThrowFromFinallyBlock") |
| @Override public void dispose(boolean external) throws IgniteCheckedException { |
| try { |
| if (rsrcMgr != null && !external) { |
| File jobLocDir = jobLocalDir(igniteWorkDirectory(), locNodeId, jobId); |
| |
| if (jobLocDir.exists()) |
| U.delete(jobLocDir); |
| } |
| } |
| finally { |
| taskCtxClsPool.clear(); |
| |
| Throwable err = null; |
| |
| // Stop the daemon threads that have been created |
| // with the task class loaders: |
| while (true) { |
| Class<? extends HadoopTaskContext> cls = fullCtxClsQueue.poll(); |
| |
| if (cls == null) |
| break; |
| |
| try { |
| final ClassLoader ldr = cls.getClassLoader(); |
| |
| try { |
| // Stop Hadoop daemons for this *task*: |
| stopHadoopFsDaemons(ldr); |
| } |
| catch (Exception e) { |
| if (err == null) |
| err = e; |
| } |
| |
| // Also close all the FileSystems cached in |
| // HadoopLazyConcurrentMap for this *task* class loader: |
| closeCachedTaskFileSystems(ldr); |
| } |
| catch (Throwable e) { |
| if (err == null) |
| err = e; |
| |
| if (e instanceof Error) |
| throw (Error)e; |
| } |
| } |
| |
| assert fullCtxClsQueue.isEmpty(); |
| |
| try { |
| // Close all cached file systems for this *Job*: |
| fsMap.close(); |
| } |
| catch (Exception e) { |
| if (err == null) |
| err = e; |
| } |
| |
| if (err != null) |
| throw U.cast(err); |
| } |
| } |
| |
| /** |
| * Stops Hadoop Fs daemon threads. |
| * @param ldr The task ClassLoader to stop the daemons for. |
| * @throws Exception On error. |
| */ |
| private void stopHadoopFsDaemons(ClassLoader ldr) throws Exception { |
| Class<?> daemonCls = ldr.loadClass(HadoopClassLoader.CLS_DAEMON); |
| |
| Method m = daemonCls.getMethod("dequeueAndStopAll"); |
| |
| m.invoke(null); |
| } |
| |
| /** |
| * Closes all the file systems user by task |
| * @param ldr The task class loader. |
| * @throws Exception On error. |
| */ |
| private void closeCachedTaskFileSystems(ClassLoader ldr) throws Exception { |
| Class<?> clazz = ldr.loadClass(HadoopV2TaskContext.class.getName()); |
| |
| Method m = clazz.getMethod("close"); |
| |
| m.invoke(null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { |
| rsrcMgr.prepareTaskWorkDir(taskLocalDir(igniteWorkDirectory(), locNodeId, info)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { |
| HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), info.taskNumber())).get(); |
| |
| taskCtxClsPool.add(ctx.getClass()); |
| |
| File locDir = taskLocalDir(igniteWorkDirectory(), locNodeId, info); |
| |
| if (locDir.exists()) |
| U.delete(locDir); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void cleanupStagingDirectory() { |
| rsrcMgr.cleanupStagingDirectory(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String igniteWorkDirectory() { |
| return helper.workDirectory(); |
| } |
| |
| /** |
| * Getter for job configuration. |
| * @return The job configuration. |
| */ |
| public JobConf jobConf() { |
| return jobConf; |
| } |
| |
| /** |
| * Gets file system for this job. |
| * @param uri The uri. |
| * @param cfg The configuration. |
| * @return The file system. |
| * @throws IOException On error. |
| */ |
| public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws IOException { |
| return fileSystemForMrUserWithCaching(uri, cfg, fsMap); |
| } |
| |
| /** |
| * Create class loader with the given name. |
| * |
| * @param name Name. |
| * @return Class loader. |
| */ |
| private HadoopClassLoader createClassLoader(String name) { |
| return new HadoopClassLoader(rsrcMgr.classPath(), name, libNames, helper); |
| } |
| } |