| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.runtime.task; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.net.URI; |
| import java.net.URL; |
| import java.nio.ByteBuffer; |
| import java.security.PrivilegedExceptionAction; |
| import java.text.SimpleDateFormat; |
| import java.util.Calendar; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.FutureTask; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import javax.annotation.Nullable; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; |
| import org.apache.log4j.helpers.ThreadLocalMap; |
| import org.apache.tez.common.ContainerContext; |
| import org.apache.tez.common.ContainerTask; |
| import org.apache.tez.common.TezCommonUtils; |
| import org.apache.tez.common.TezExecutors; |
| import org.apache.tez.common.TezLocalResource; |
| import org.apache.tez.common.TezSharedExecutor; |
| import org.apache.tez.common.TezTaskUmbilicalProtocol; |
| import org.apache.tez.common.TezUtilsInternal; |
| import org.apache.tez.common.counters.Limits; |
| import org.apache.tez.common.security.JobTokenIdentifier; |
| import org.apache.tez.common.security.TokenCache; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.TezException; |
| import org.apache.tez.dag.api.records.DAGProtos; |
| import org.apache.tez.dag.records.TezTaskAttemptID; |
| import org.apache.tez.dag.records.TezVertexID; |
| import org.apache.tez.dag.utils.RelocalizationUtils; |
| import org.apache.tez.hadoop.shim.HadoopShim; |
| import org.apache.tez.hadoop.shim.HadoopShimsLoader; |
| import org.apache.tez.runtime.TezThreadDumpHelper; |
| import org.apache.tez.runtime.api.ExecutionContext; |
| import org.apache.tez.runtime.api.impl.ExecutionContextImpl; |
| import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; |
| import org.apache.tez.runtime.internals.api.TaskReporterInterface; |
| import org.apache.tez.util.LoggingUtils; |
| |
| import org.apache.tez.util.TezRuntimeShutdownHandler; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Function; |
| import org.apache.tez.common.Preconditions; |
| import org.apache.tez.common.TezClassLoader; |
| |
| import com.google.common.collect.HashMultimap; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Multimap; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ListeningExecutorService; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| public class TezChild { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TezChild.class); |
| |
| private final Configuration defaultConf; |
| private final String containerIdString; |
| private final int appAttemptNumber; |
| private final String[] localDirs; |
| |
| private final AtomicLong heartbeatCounter = new AtomicLong(0); |
| |
| private final int getTaskMaxSleepTime; |
| private final int amHeartbeatInterval; |
| private final long sendCounterInterval; |
| private final int maxEventsToGet; |
| private final String workingDir; |
| |
| private final ListeningExecutorService executor; |
| private final ObjectRegistryImpl objectRegistry; |
| private final String pid; |
| private final ExecutionContext executionContext; |
| private final Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>(); |
| private final Map<String, String> serviceProviderEnvMap; |
| private final Credentials credentials; |
| private final long memAvailable; |
| private final AtomicBoolean isShutdown = new AtomicBoolean(false); |
| private final String user; |
| private final boolean updateSysCounters; |
| private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER; |
| |
| private Multimap<String, String> startedInputsMap = HashMultimap.create(); |
| private final boolean ownUmbilical; |
| |
| private final TezTaskUmbilicalProtocol umbilical; |
| private TaskReporterInterface taskReporter; |
| private int taskCount = 0; |
| private TezVertexID lastVertexID; |
| private final HadoopShim hadoopShim; |
| private final TezExecutors sharedExecutor; |
| private ThreadLocalMap mdcContext; |
| |
| public TezChild(Configuration conf, String host, int port, String containerIdentifier, |
| String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs, |
| Map<String, String> serviceProviderEnvMap, |
| ObjectRegistryImpl objectRegistry, String pid, |
| ExecutionContext executionContext, |
| Credentials credentials, long memAvailable, String user, TezTaskUmbilicalProtocol umbilical, |
| boolean updateSysCounters, HadoopShim hadoopShim) throws IOException, InterruptedException { |
| this.mdcContext = LoggingUtils.setupLog4j(); |
| this.defaultConf = conf; |
| this.containerIdString = containerIdentifier; |
| this.appAttemptNumber = appAttemptNumber; |
| this.localDirs = localDirs; |
| this.serviceProviderEnvMap = serviceProviderEnvMap; |
| this.workingDir = workingDir; |
| this.pid = pid; |
| this.executionContext = executionContext; |
| this.credentials = credentials; |
| this.memAvailable = memAvailable; |
| this.user = user; |
| this.updateSysCounters = updateSysCounters; |
| this.hadoopShim = hadoopShim; |
| this.sharedExecutor = new TezSharedExecutor(defaultConf); |
| |
| getTaskMaxSleepTime = defaultConf.getInt( |
| TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX, |
| TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT); |
| |
| amHeartbeatInterval = defaultConf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS, |
| TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT); |
| |
| sendCounterInterval = defaultConf.getLong( |
| TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS, |
| TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT); |
| |
| maxEventsToGet = defaultConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT, |
| TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT); |
| |
| ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() |
| .setDaemon(true).setNameFormat("TezChild").build()); |
| this.executor = MoreExecutors.listeningDecorator(executor); |
| |
| this.objectRegistry = objectRegistry; |
| |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Executing with tokens:"); |
| for (Token<?> token : credentials.getAllTokens()) { |
| LOG.debug("{}", token); |
| } |
| } |
| |
| UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier); |
| Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); |
| |
| String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, |
| TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); |
| serviceConsumerMetadata.put(auxiliaryService, |
| TezCommonUtils.convertJobTokenToBytes(jobToken)); |
| |
| if (umbilical == null) { |
| final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port); |
| SecurityUtil.setTokenService(jobToken, address); |
| taskOwner.addToken(jobToken); |
| this.umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() { |
| @Override |
| public TezTaskUmbilicalProtocol run() throws Exception { |
| return RPC.getProxy(TezTaskUmbilicalProtocol.class, |
| TezTaskUmbilicalProtocol.versionID, address, defaultConf); |
| } |
| }); |
| ownUmbilical = true; |
| } else { |
| this.umbilical = umbilical; |
| ownUmbilical = false; |
| } |
| TezCommonUtils.logCredentials(LOG, credentials, "tezChildInit"); |
| } |
| |
| public ContainerExecutionResult run() throws IOException, InterruptedException, TezException { |
| |
| ContainerContext containerContext = new ContainerContext(containerIdString); |
| ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext, |
| getTaskMaxSleepTime); |
| |
| taskReporter = new TaskReporter(umbilical, amHeartbeatInterval, |
| sendCounterInterval, maxEventsToGet, heartbeatCounter, containerIdString); |
| |
| UserGroupInformation childUGI = null; |
| |
| while (!executor.isTerminated() && !isShutdown.get()) { |
| if (taskCount > 0) { |
| TezUtilsInternal.updateLoggers(defaultConf, "", LoggingUtils.getPatternForTask(defaultConf)); |
| } |
| ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter); |
| boolean error = false; |
| ContainerTask containerTask = null; |
| try { |
| containerTask = getTaskFuture.get(); |
| } catch (ExecutionException e) { |
| error = true; |
| Throwable cause = e.getCause(); |
| LOG.error("Error fetching new work for container {}", containerIdString, |
| cause); |
| return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, |
| cause, "Execution Exception while fetching new work: " + e.getMessage()); |
| } catch (InterruptedException e) { |
| error = true; |
| LOG.info("Interrupted while waiting for new work for container {}", containerIdString); |
| return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED, e, |
| "Interrupted while waiting for new work"); |
| } finally { |
| if (error) { |
| shutdown(); |
| } |
| } |
| |
| TezTaskAttemptID attemptId = containerTask.getTaskSpec().getTaskAttemptID(); |
| Configuration taskConf; |
| if (containerTask.getTaskSpec().getTaskConf() != null) { |
| Configuration copy = new Configuration(defaultConf); |
| TezTaskRunner2.mergeTaskSpecConfToConf(containerTask.getTaskSpec(), copy); |
| taskConf = copy; |
| LoggingUtils.initLoggingContext(mdcContext, copy, |
| attemptId.getTaskID().getVertexID().getDAGID().toString(), attemptId.toString()); |
| } else { |
| taskConf = defaultConf; |
| LoggingUtils.initLoggingContext(mdcContext, defaultConf, |
| attemptId.getTaskID().getVertexID().getDAGID().toString(), attemptId.toString()); |
| } |
| |
| TezCommonUtils.logCredentials(LOG, containerTask.getCredentials(), "containerTask"); |
| if (containerTask.shouldDie()) { |
| LOG.info("ContainerTask returned shouldDie=true for container {}, Exiting", containerIdString); |
| shutdown(); |
| return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null, |
| "Asked to die by the AM"); |
| } else { |
| String loggerAddend = containerTask.getTaskSpec().getTaskAttemptID().toString(); |
| taskCount++; |
| String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); |
| System.err.println(timeStamp + " Starting to run new task attempt: " + |
| containerTask.getTaskSpec().getTaskAttemptID().toString()); |
| System.out.println(timeStamp + " Starting to run new task attempt: " + |
| containerTask.getTaskSpec().getTaskAttemptID().toString()); |
| TezUtilsInternal.setHadoopCallerContext(hadoopShim, |
| containerTask.getTaskSpec().getTaskAttemptID()); |
| TezUtilsInternal.updateLoggers(defaultConf, loggerAddend, LoggingUtils.getPatternForTask(defaultConf)); |
| |
| FileSystem.clearStatistics(); |
| |
| childUGI = handleNewTaskCredentials(containerTask, childUGI); |
| TezCommonUtils.logCredentials(LOG, childUGI.getCredentials(), "taskChildUGI"); |
| handleNewTaskLocalResources(containerTask, childUGI); |
| cleanupOnTaskChanged(containerTask); |
| |
| // Execute the Actual Task |
| TezTaskRunner2 taskRunner = new TezTaskRunner2(defaultConf, childUGI, |
| localDirs, containerTask.getTaskSpec(), appAttemptNumber, |
| serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter, |
| executor, objectRegistry, pid, executionContext, memAvailable, updateSysCounters, |
| hadoopShim, sharedExecutor); |
| |
| boolean shouldDie; |
| tezThreadDumpHelper = TezThreadDumpHelper.getInstance(taskConf).start(attemptId.toString()); |
| try { |
| TaskRunner2Result result = taskRunner.run(); |
| LOG.info("TaskRunner2Result: {}", result); |
| shouldDie = result.isContainerShutdownRequested(); |
| if (shouldDie) { |
| LOG.info("Got a shouldDie notification via heartbeats for container {}. Shutting down", containerIdString); |
| shutdown(); |
| return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null, |
| "Asked to die by the AM"); |
| } |
| if (result.getError() != null) { |
| Throwable e = result.getError(); |
| handleError(result.getError()); |
| return new ContainerExecutionResult( |
| ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, |
| e, "TaskExecutionFailure: " + e.getMessage()); |
| } |
| } finally { |
| tezThreadDumpHelper.stop(); |
| FileSystem.closeAllForUGI(childUGI); |
| } |
| } |
| } |
| return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null, |
| null); |
| } |
| |
| /** |
| * Setup |
| * |
| * @param containerTask |
| * the new task specification. Must be a valid task |
| * @param childUGI |
| * the old UGI instance being used |
| * @return childUGI |
| */ |
| UserGroupInformation handleNewTaskCredentials(ContainerTask containerTask, |
| UserGroupInformation childUGI) { |
| // Re-use the UGI only if the Credentials have not changed. |
| Preconditions.checkState(!containerTask.shouldDie()); |
| Preconditions.checkState(containerTask.getTaskSpec() != null); |
| if (containerTask.haveCredentialsChanged()) { |
| Credentials taskCreds = containerTask.getCredentials(); |
| if (taskCreds != null) { |
| LOG.info("Refreshing UGI since Credentials have changed. Credentials : #Tokens=" + |
| taskCreds.numberOfTokens() + ", #SecretKeys=" |
| + taskCreds.numberOfSecretKeys()); |
| childUGI = UserGroupInformation.createRemoteUser(user); |
| childUGI.addCredentials(containerTask.getCredentials()); |
| } else { |
| LOG.info("Not loading any credentials, since no credentials provided"); |
| } |
| } |
| return childUGI; |
| } |
| |
| /** |
| * Handles any additional resources to be localized for the new task |
| * |
| * @param containerTask |
| * @throws IOException |
| * @throws TezException |
| */ |
| private void handleNewTaskLocalResources(ContainerTask containerTask, |
| UserGroupInformation ugi) throws IOException, TezException { |
| |
| final Map<String, TezLocalResource> additionalResources = containerTask.getAdditionalResources(); |
| LOG.debug("Additional Resources added to container: {}", additionalResources); |
| |
| if (additionalResources != null && !additionalResources.isEmpty()) { |
| LOG.info("Localizing additional local resources for Task : " + additionalResources); |
| |
| try { |
| List<URL> downloadedUrls = ugi.doAs(new PrivilegedExceptionAction<List<URL>>() { |
| @Override |
| public List<URL> run() throws Exception { |
| return RelocalizationUtils.processAdditionalResources( |
| Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() { |
| @Override |
| public URI apply(TezLocalResource input) { |
| return input.getUri(); |
| } |
| }), defaultConf, workingDir); |
| } |
| }); |
| RelocalizationUtils.addUrlsToClassPath(downloadedUrls); |
| } catch (InterruptedException e) { |
| throw new TezException(e); |
| } |
| LOG.info("Done localizing additional resources"); |
| } |
| } |
| |
| /** |
| * Cleans entries from the object registry, and resets the startedInputsMap if required |
| * |
| * @param containerTask |
| * the new task specification. Must be a valid task |
| */ |
| private void cleanupOnTaskChanged(ContainerTask containerTask) { |
| Preconditions.checkState(!containerTask.shouldDie()); |
| Preconditions.checkState(containerTask.getTaskSpec() != null); |
| TezVertexID newVertexID = containerTask.getTaskSpec().getTaskAttemptID().getVertexID(); |
| if (lastVertexID != null) { |
| if (!lastVertexID.equals(newVertexID)) { |
| objectRegistry.clearCache(ObjectRegistryImpl.ObjectLifeCycle.VERTEX); |
| } |
| if (!lastVertexID.getDAGID().equals(newVertexID.getDAGID())) { |
| objectRegistry.clearCache(ObjectRegistryImpl.ObjectLifeCycle.DAG); |
| startedInputsMap = HashMultimap.create(); |
| } |
| } |
| lastVertexID = newVertexID; |
| } |
| |
| public void shutdown() { |
| LOG.info("Shutdown invoked for container {}", containerIdString); |
| if (!isShutdown.getAndSet(true)) { |
| LOG.info("Shutting down container {}", containerIdString); |
| // It's possible that there's pending tasks on the executor. Those should be cancelled. |
| List<Runnable> pendingRunnables = executor.shutdownNow(); |
| LOG.info("There are {} runnables in shared executor, cancelling those...", pendingRunnables.size()); |
| for (Runnable r : pendingRunnables) { |
| LOG.info("Cancelling pending runnable ({}) during TezChild shutdown for containerId={}", r.hashCode(), |
| containerIdString); |
| ((FutureTask)r).cancel(false); |
| } |
| if (taskReporter != null) { |
| taskReporter.shutdown(); |
| } |
| if (ownUmbilical) { |
| RPC.stopProxy(umbilical); |
| } |
| } |
| |
| TezRuntimeShutdownHandler.shutdown(); |
| LOG.info("TezChild shutdown finished"); |
| } |
| |
| public static class ContainerExecutionResult { |
| public static enum ExitStatus { |
| SUCCESS(0), |
| EXECUTION_FAILURE(1), |
| INTERRUPTED(2), |
| ASKED_TO_DIE(3); |
| |
| private final int exitCode; |
| |
| ExitStatus(int code) { |
| this.exitCode = code; |
| } |
| |
| public int getExitCode() { |
| return this.exitCode; |
| } |
| } |
| |
| private final ExitStatus exitStatus; |
| private final Throwable throwable; |
| private final String errorMessage; |
| |
| public ContainerExecutionResult(ExitStatus exitStatus, @Nullable Throwable throwable, |
| @Nullable String errorMessage) { |
| this.exitStatus = exitStatus; |
| this.throwable = throwable; |
| this.errorMessage = errorMessage; |
| } |
| |
| public ExitStatus getExitStatus() { |
| return this.exitStatus; |
| } |
| |
| public Throwable getThrowable() { |
| return this.throwable; |
| } |
| |
| public String getErrorMessage() { |
| return this.errorMessage; |
| } |
| |
| @Override |
| public String toString() { |
| return "ContainerExecutionResult{" + |
| "exitStatus=" + exitStatus + |
| ", throwable=" + throwable + |
| ", errorMessage='" + errorMessage + '\'' + |
| '}'; |
| } |
| } |
| |
| public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier, |
| String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory, |
| Map<String, String> serviceProviderEnvMap, @Nullable String pid, |
| ExecutionContext executionContext, Credentials credentials, long memAvailable, String user, |
| TezTaskUmbilicalProtocol tezUmbilical, boolean updateSysCounters, HadoopShim hadoopShim) |
| throws IOException, InterruptedException, TezException { |
| |
| // Pull in configuration specified for the session. |
| // TODO TEZ-1233. This needs to be moved over the wire rather than localizing the file |
| // for each and every task, and reading it back from disk. Also needs to be per vertex. |
| Limits.setConfiguration(conf); |
| |
| TezUtilsInternal.setSecurityUtilConfigration(LOG, conf); |
| |
| // singleton of ObjectRegistry for this JVM |
| ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl(); |
| |
| return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier, |
| attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry, pid, |
| executionContext, credentials, memAvailable, user, tezUmbilical, updateSysCounters, |
| hadoopShim); |
| } |
| |
| public static void main(String[] args) throws IOException, InterruptedException, TezException { |
| TezClassLoader.setupTezClassLoader(); |
| final Configuration defaultConf = new Configuration(); |
| |
| Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); |
| final String pid = System.getenv().get("JVM_PID"); |
| |
| |
| assert args.length == 5; |
| String host = args[0]; |
| int port = Integer.parseInt(args[1]); |
| final String containerIdentifier = args[2]; |
| final String tokenIdentifier = args[3]; |
| final int attemptNumber = Integer.parseInt(args[4]); |
| final String[] localDirs = TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS |
| .name())); |
| LOG.info("TezChild starting with PID=" + pid + ", containerIdentifier=" + containerIdentifier); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Info from cmd line: AM-host: " + host + " AM-port: " + port |
| + " containerIdentifier: " + containerIdentifier + " appAttemptNumber: " + attemptNumber |
| + " tokenIdentifier: " + tokenIdentifier); |
| } |
| |
| // Security framework already loaded the tokens into current ugi |
| DAGProtos.ConfigurationProto confProto = |
| TezUtilsInternal.readUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name())); |
| TezUtilsInternal.addUserSpecifiedTezConfiguration(defaultConf, confProto.getConfKeyValuesList()); |
| UserGroupInformation.setConfiguration(defaultConf); |
| Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); |
| |
| HadoopShim hadoopShim = new HadoopShimsLoader(defaultConf).getHadoopShim(); |
| |
| // log the system properties |
| if (LOG.isInfoEnabled()) { |
| String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(defaultConf); |
| if (systemPropsToLog != null) { |
| LOG.info(systemPropsToLog); |
| } |
| } |
| |
| TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier, |
| tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()), |
| System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())), |
| credentials, Runtime.getRuntime().maxMemory(), System |
| .getenv(ApplicationConstants.Environment.USER.toString()), null, true, hadoopShim); |
| ContainerExecutionResult result = tezChild.run(); |
| LOG.info("TezChild is about to exit from main(), run() returned result: {}", result.toString()); |
| } |
| |
| private void handleError(Throwable t) { |
| shutdown(); |
| } |
| |
| } |