| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.api.client; |
| |
| import javax.annotation.Nullable; |
| |
| import java.io.IOException; |
| import java.text.DecimalFormat; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.tez.common.CachedEntity; |
| import org.apache.tez.common.Preconditions; |
| import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationReport; |
| import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.tez.client.FrameworkClient; |
| import org.apache.tez.common.counters.TezCounters; |
| import org.apache.tez.dag.api.DAGNotRunningException; |
| import org.apache.tez.dag.api.NoCurrentDAGException; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.TezException; |
| import org.apache.tez.dag.api.TezUncheckedException; |
| import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl; |
| import org.apache.tez.dag.api.records.DAGProtos; |
| |
| @Private |
| public class DAGClientImpl extends DAGClient { |
| private static final Logger LOG = LoggerFactory.getLogger(DAGClientImpl.class); |
| |
| private final ApplicationId appId; |
| private final String dagId; |
| private final TezConfiguration conf; |
| private final FrameworkClient frameworkClient; |
| /** |
| * Container to cache the last {@link DAGStatus}. |
| */ |
| private final CachedEntity<DAGStatus> cachedDAGStatusRef; |
| @VisibleForTesting |
| protected DAGClientInternal realClient; |
| private volatile boolean dagCompleted = false; |
| @VisibleForTesting |
| protected boolean isATSEnabled = false; |
| Map<String, VertexStatus> cachedVertexStatus = new HashMap<String, VertexStatus>(); |
| |
| private static final long SLEEP_FOR_COMPLETION = 500; |
| private static final long PRINT_STATUS_INTERVAL_MILLIS = 5000; |
| private final DecimalFormat formatter = new DecimalFormat("###.##%"); |
| private long lastPrintStatusTimeMillis; |
| private EnumSet<VertexStatus.State> vertexCompletionStates = EnumSet.of( |
| VertexStatus.State.SUCCEEDED, VertexStatus.State.FAILED, VertexStatus.State.KILLED, |
| VertexStatus.State.ERROR); |
| private long statusPollInterval; |
| private long diagnoticsWaitTimeout; |
| private boolean cleanupFrameworkClient; |
| |
| public DAGClientImpl(ApplicationId appId, String dagId, TezConfiguration conf, |
| @Nullable FrameworkClient frameworkClient, UserGroupInformation ugi) { |
| this.appId = appId; |
| this.dagId = dagId; |
| this.conf = conf; |
| if (frameworkClient != null) { |
| this.frameworkClient = frameworkClient; |
| } else { |
| this.frameworkClient = FrameworkClient.createFrameworkClient(conf); |
| this.frameworkClient.init(conf); |
| this.frameworkClient.start(); |
| cleanupFrameworkClient = true; |
| } |
| isATSEnabled = conf.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "") |
| .equals("org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService") && |
| conf.getBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, |
| TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT) && |
| conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, |
| TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT) && |
| DAGClientTimelineImpl.isSupported(); |
| |
| realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient, ugi); |
| statusPollInterval = conf.getLong( |
| TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS, |
| TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT); |
| if(statusPollInterval < 0) { |
| LOG.error("DAG Status poll interval cannot be negative and setting to default value."); |
| statusPollInterval = TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT; |
| } |
| this.diagnoticsWaitTimeout = conf.getLong( |
| TezConfiguration.TEZ_CLIENT_DIAGNOSTICS_WAIT_TIMEOUT_MS, |
| TezConfiguration.TEZ_CLIENT_DIAGNOSTICS_WAIT_TIMEOUT_MS_DEFAULT); |
| cachedDAGStatusRef = initCacheDAGRefFromConf(conf); |
| } |
| |
| /** |
| * Constructs a new {@link CachedEntity} for {@link DAGStatus}. |
| * @param tezConf TEZ configuration parameters. |
| * @return a caching entry to hold the {@link DAGStatus}. |
| */ |
| protected CachedEntity<DAGStatus> initCacheDAGRefFromConf(TezConfiguration tezConf) { |
| long clientDAGStatusCacheTimeOut = tezConf.getLong( |
| TezConfiguration.TEZ_CLIENT_DAG_STATUS_CACHE_TIMEOUT_SECS, |
| TezConfiguration.TEZ_CLIENT_DAG_STATUS_CACHE_TIMEOUT_SECS_DEFAULT); |
| if (clientDAGStatusCacheTimeOut <= 0) { |
| LOG.error("DAG Status cache timeout interval should be positive. Enforcing default value."); |
| clientDAGStatusCacheTimeOut = |
| TezConfiguration.TEZ_CLIENT_DAG_STATUS_CACHE_TIMEOUT_SECS_DEFAULT; |
| } |
| return new CachedEntity<>(TimeUnit.SECONDS, clientDAGStatusCacheTimeOut); |
| } |
| |
| protected CachedEntity<DAGStatus> getCachedDAGStatusRef() { |
| return cachedDAGStatusRef; |
| } |
| |
| @Override |
| public String getExecutionContext() { |
| return realClient.getExecutionContext(); |
| } |
| |
| @Override |
| protected ApplicationReport getApplicationReportInternal() { |
| return realClient.getApplicationReportInternal(); |
| } |
| |
| @Override |
| public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions, |
| final long timeout) throws TezException, IOException { |
| |
| Preconditions.checkArgument(timeout >= -1, "Timeout must be >= -1"); |
| // Short circuit a timeout of 0. |
| if (timeout == 0) { |
| return getDAGStatusInternal(statusOptions, timeout); |
| } |
| |
| long startTime = System.currentTimeMillis(); |
| |
| DAGStatus dagStatus = cachedDAGStatusRef.getValue(); |
| boolean refreshStatus = true; |
| if (dagStatus == null) { |
| // the first lookup only or when the cachedDAG has expired |
| dagStatus = getDAGStatus(statusOptions); |
| refreshStatus = false; |
| } |
| |
| // Handling when client dag status init or submitted. This really implies that the RM was |
| // contacted to get status. INITING is never used. DAG_INITING implies a DagState of RUNNING. |
| if (dagStatus.getState() == DAGStatus.State.INITING |
| || dagStatus.getState() == DAGStatus.State.SUBMITTED) { |
| long timeoutAbsolute = startTime + timeout; |
| while (timeout < 0 |
| || (timeout > 0 && timeoutAbsolute > System.currentTimeMillis())) { |
| if (refreshStatus) { |
| // Try fetching the state with a timeout, in case the AM is already up. |
| dagStatus = getDAGStatusInternal(statusOptions, timeout); |
| } |
| refreshStatus = true; // For the next iteration of the loop. |
| |
| if (dagStatus.getState() == DAGStatus.State.RUNNING) { |
| // Refreshed status indicates that the DAG is running. |
| // This status could have come from the AM or the RM - client sleep if RM, otherwise send request to the AM. |
| if (dagStatus.getSource() == DagStatusSource.AM) { |
| // RUNNING + AM should only happen if timeout is > -1. |
| // Otherwise the AM ignored the -1 value, or the AM source in the DAGStatus is invalid. |
| Preconditions.checkState(timeout > -1, "Should not reach here with a timeout of -1. File a bug"); |
| return dagStatus; |
| } else { |
| // From the RM. Fall through to the Sleep. |
| } |
| } else if(dagStatus.getState() == DAGStatus.State.SUCCEEDED |
| || dagStatus.getState() == DAGStatus.State.FAILED |
| || dagStatus.getState() == DAGStatus.State.KILLED |
| || dagStatus.getState() == DAGStatus.State.ERROR) { |
| // Again, check if this was from the RM. If it was, try getting it from a more informative source. |
| if (dagStatus.getSource() == DagStatusSource.RM) { |
| return getDAGStatusInternal(statusOptions, 0); |
| } else { |
| return dagStatus; |
| } |
| } |
| // Sleep before checking again. |
| long currentStatusPollInterval; |
| if (timeout < 0) { |
| currentStatusPollInterval = statusPollInterval; |
| } else { |
| long remainingTimeout = timeoutAbsolute - System.currentTimeMillis(); |
| if (remainingTimeout < 0) { |
| // Timeout expired. Return the latest known dag status. |
| return dagStatus; |
| } else { |
| currentStatusPollInterval = remainingTimeout < statusPollInterval ? remainingTimeout : statusPollInterval; |
| } |
| } |
| try { |
| Thread.sleep(currentStatusPollInterval); |
| } catch (InterruptedException e) { |
| throw new TezException(e); |
| } |
| }// End of while |
| // Timeout may have expired before a single refresh |
| if (refreshStatus) { |
| return getDAGStatus(statusOptions); |
| } else { |
| return dagStatus; |
| } |
| } else { // Already running, or complete. Fallback to regular dagStatus with a timeout. |
| return getDAGStatusInternal(statusOptions, timeout); |
| } |
| } |
| |
| protected DAGStatus getDAGStatusInternal(@Nullable Set<StatusGetOpts> statusOptions, |
| long timeout) throws TezException, IOException { |
| |
| if (!dagCompleted) { |
| // fetch from AM. on Error and while DAG is still not completed (could not reach AM, AM got |
| // killed). return cached status. This prevents the progress being reset (for ex fetching from |
| // RM does not give status). |
| |
| // dagCompleted may be reset within getDagStatusViaAM |
| final DAGStatus dagStatus = getDAGStatusViaAM(statusOptions, timeout); |
| |
| if (!dagCompleted) { |
| if (dagStatus != null) { // update the cached DAGStatus |
| cachedDAGStatusRef.setValue(dagStatus); |
| return dagStatus; |
| } |
| DAGStatus cachedDAG = cachedDAGStatusRef.getValue(); |
| if (cachedDAG != null) { |
| // could not get from AM (not reachable/ was killed). return cached status. |
| return cachedDAG; |
| } |
| } |
| |
| if (isATSEnabled && dagCompleted) { |
| switchToTimelineClient(); |
| } |
| } |
| |
| if (isATSEnabled && dagCompleted) { |
| try { |
| // fetch from ATS and return only if status is completed. |
| DAGStatus dagStatus = realClient.getDAGStatus(statusOptions); |
| if (dagStatus.isCompleted()) { |
| return dagStatus; |
| } |
| } catch (ApplicationNotFoundException e) { |
| LOG.info("Failed to fetch DAG data for completed DAG from YARN Timeline" |
| + " - Application not found by YARN", e); |
| } catch (TezException e) { |
| LOG.debug("DAGStatus fetch failed", e); |
| } |
| } |
| |
| // dag completed and Timeline service is either not enabled or does not have completion status |
| // return cached status if completion info is present. |
| if (dagCompleted) { |
| DAGStatus cachedDag = cachedDAGStatusRef.getValue(); |
| if (cachedDag != null && cachedDag.isCompleted()) { |
| return cachedDag; |
| } |
| } |
| |
| // everything else fails rely on RM. |
| return getDAGStatusViaRM(); |
| } |
| |
| @Override |
| public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions) throws |
| TezException, IOException { |
| return getDAGStatusInternal(statusOptions, 0); |
| } |
| |
| @Override |
| public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions) |
| throws IOException, TezException { |
| return getVertexStatusInternal(statusOptions, vertexName); |
| } |
| |
| protected VertexStatus getVertexStatusInternal(Set<StatusGetOpts> statusOptions, String vertexName) |
| throws IOException, TezException { |
| if (!dagCompleted) { |
| VertexStatus vertexStatus = getVertexStatusViaAM(vertexName, statusOptions); |
| |
| if (!dagCompleted) { |
| if (vertexStatus != null) { |
| cachedVertexStatus.put(vertexName, vertexStatus); |
| return vertexStatus; |
| } |
| if (cachedVertexStatus.containsKey(vertexName)) { |
| return cachedVertexStatus.get(vertexName); |
| } |
| } |
| |
| if (isATSEnabled && dagCompleted) { |
| switchToTimelineClient(); |
| } |
| } |
| |
| if (isATSEnabled && dagCompleted) { |
| try { |
| final VertexStatus vertexStatus = realClient.getVertexStatus(vertexName, statusOptions); |
| if (vertexCompletionStates.contains(vertexStatus.getState())) { |
| return vertexStatus; |
| } |
| } catch (ApplicationNotFoundException e) { |
| LOG.info("Failed to fetch Vertex data for completed DAG from YARN Timeline" |
| + " - Application not found by YARN", e); |
| return null; |
| } catch (TezException e) { |
| LOG.debug("ERROR fetching vertex data from Yarn Timeline", e); |
| } |
| } |
| |
| if (cachedVertexStatus.containsKey(vertexName)) { |
| final VertexStatus vertexStatus = cachedVertexStatus.get(vertexName); |
| if (vertexCompletionStates.contains(vertexStatus.getState())) { |
| return vertexStatus; |
| } |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public String getDagIdentifierString() { |
| return realClient.getDagIdentifierString(); |
| } |
| |
| @Override |
| public String getSessionIdentifierString() { |
| return realClient.getSessionIdentifierString(); |
| } |
| |
| @Override |
| public void tryKillDAG() throws IOException, TezException { |
| if (!dagCompleted) { |
| realClient.tryKillDAG(); |
| } else { |
| LOG.info("TryKill for app: " + appId + " dag:" + dagId + " dag already completed."); |
| } |
| } |
| |
| @Override |
| public DAGStatus waitForCompletion(long timeMs) throws IOException, TezException, InterruptedException { |
| return _waitForCompletionWithStatusUpdates(timeMs, false, EnumSet.noneOf(StatusGetOpts.class)); |
| } |
| |
| @Override |
| public DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException { |
| return _waitForCompletionWithStatusUpdates(-1, false, EnumSet.noneOf(StatusGetOpts.class)); |
| } |
| |
| @Override |
| public DAGStatus waitForCompletionWithStatusUpdates( |
| @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException, |
| InterruptedException { |
| return _waitForCompletionWithStatusUpdates(-1, true, statusGetOpts); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| realClient.close(); |
| if (frameworkClient != null && cleanupFrameworkClient) { |
| frameworkClient.stop(); |
| } |
| } |
| |
| /** |
| * Get the DAG status via the AM |
| * @param statusOptions |
| * @param timeout |
| * @return null if the AM cannot be contacted, otherwise the DAGstatus |
| * @throws IOException |
| */ |
| private DAGStatus getDAGStatusViaAM(@Nullable Set<StatusGetOpts> statusOptions, |
| long timeout) throws IOException { |
| DAGStatus dagStatus = null; |
| try { |
| dagStatus = realClient.getDAGStatus(statusOptions, timeout); |
| } catch (DAGNotRunningException e) { |
| LOG.info("DAG is no longer running", e); |
| dagCompleted = true; |
| } catch (ApplicationNotFoundException e) { |
| LOG.info("DAG is no longer running - application not found by YARN", e); |
| dagCompleted = true; |
| } catch (NoCurrentDAGException e) { |
| LOG.info("Got NoCurrentDAGException from AM, returning a failed DAG", e); |
| return dagLost(); |
| } catch (TezException e) { |
| // can be either due to a n/w issue or due to AM completed. |
| LOG.info("Cannot retrieve DAG Status due to TezException: {}", e.getMessage()); |
| } catch (IOException e) { |
| // can be either due to a n/w issue or due to AM completed. |
| LOG.info("Cannot retrieve DAG Status due to IOException: {}", e.getMessage()); |
| } |
| |
| if (dagStatus == null && !dagCompleted) { |
| checkAndSetDagCompletionStatus(); |
| } |
| |
| return dagStatus; |
| } |
| |
| private DAGStatus dagLost() { |
| DAGProtos.DAGStatusProto.Builder builder = DAGProtos.DAGStatusProto.newBuilder(); |
| DAGStatus dagStatus = new DAGStatus(builder, DagStatusSource.AM); |
| builder.setState(DAGProtos.DAGStatusStateProto.DAG_FAILED); |
| builder.addAllDiagnostics(Collections.singleton(NoCurrentDAGException.MESSAGE_PREFIX)); |
| return dagStatus; |
| } |
| |
| private VertexStatus getVertexStatusViaAM(String vertexName, Set<StatusGetOpts> statusOptions) throws |
| IOException { |
| VertexStatus vertexStatus = null; |
| try { |
| vertexStatus = realClient.getVertexStatus(vertexName, statusOptions); |
| } catch (DAGNotRunningException e) { |
| LOG.info("DAG is no longer running", e); |
| dagCompleted = true; |
| } catch (ApplicationNotFoundException e) { |
| LOG.info("DAG is no longer running - application not found by YARN", e); |
| dagCompleted = true; |
| } catch (TezException e) { |
| // can be either due to a n/w issue or due to AM completed. |
| LOG.info("Cannot retrieve Vertex Status due to TezException: {}", e.getMessage()); |
| } catch (IOException e) { |
| // can be either due to a n/w issue of due to AM completed. |
| LOG.info("Cannot retrieve Vertex Status due to IOException: {}", e.getMessage()); |
| } |
| |
| if (vertexStatus == null && !dagCompleted) { |
| checkAndSetDagCompletionStatus(); |
| } |
| |
| return vertexStatus; |
| } |
| |
| /** |
| * Get the DAG status via the YARN ResourceManager |
| * @return the dag status, inferred from the RM App state. Does not return null. |
| * @throws TezException |
| * @throws IOException |
| */ |
| @VisibleForTesting |
| protected DAGStatus getDAGStatusViaRM() throws TezException, IOException { |
| LOG.debug("Get DAG status via framework client for app: {} dag: {}", appId, dagId); |
| ApplicationReport appReport; |
| try { |
| appReport = frameworkClient.getApplicationReport(appId); |
| LOG.debug("Got appReport from framework client: {}", appReport); |
| } catch (ApplicationNotFoundException e) { |
| LOG.info("DAG is no longer running - application not found by YARN", e); |
| throw new DAGNotRunningException(e); |
| } catch (YarnException e) { |
| throw new TezException(e); |
| } |
| |
| if(appReport == null) { |
| throw new TezException("Unknown/Invalid appId: " + appId); |
| } |
| |
| DAGProtos.DAGStatusProto.Builder builder = DAGProtos.DAGStatusProto.newBuilder(); |
| DAGStatus dagStatus = new DAGStatus(builder, DagStatusSource.RM); |
| DAGProtos.DAGStatusStateProto dagState; |
| switch (appReport.getYarnApplicationState()) { |
| case NEW: |
| case NEW_SAVING: |
| case SUBMITTED: |
| case ACCEPTED: |
| dagState = DAGProtos.DAGStatusStateProto.DAG_SUBMITTED; |
| break; |
| case RUNNING: |
| dagState = DAGProtos.DAGStatusStateProto.DAG_RUNNING; |
| break; |
| case FAILED: |
| dagState = DAGProtos.DAGStatusStateProto.DAG_FAILED; |
| break; |
| case KILLED: |
| dagState = DAGProtos.DAGStatusStateProto.DAG_KILLED; |
| break; |
| case FINISHED: |
| switch(appReport.getFinalApplicationStatus()) { |
| case UNDEFINED: |
| case FAILED: |
| dagState = DAGProtos.DAGStatusStateProto.DAG_FAILED; |
| break; |
| case KILLED: |
| dagState = DAGProtos.DAGStatusStateProto.DAG_KILLED; |
| break; |
| case SUCCEEDED: |
| dagState = DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED; |
| break; |
| default: |
| throw new TezUncheckedException("Encountered unknown final application" |
| + " status from YARN" |
| + ", appState=" + appReport.getYarnApplicationState() |
| + ", finalStatus=" + appReport.getFinalApplicationStatus()); |
| } |
| break; |
| default: |
| throw new TezUncheckedException("Encountered unknown application state" |
| + " from YARN, appState=" + appReport.getYarnApplicationState()); |
| } |
| |
| builder.setState(dagState); |
| // workaround before YARN-2560 is fixed |
| if (appReport.getFinalApplicationStatus() == FinalApplicationStatus.FAILED |
| || appReport.getFinalApplicationStatus() == FinalApplicationStatus.KILLED) { |
| long startTime = System.currentTimeMillis(); |
| while((appReport.getDiagnostics() == null |
| || appReport.getDiagnostics().isEmpty()) |
| && (System.currentTimeMillis() - startTime) < diagnoticsWaitTimeout) { |
| try { |
| Thread.sleep(100); |
| appReport = frameworkClient.getApplicationReport(appId); |
| } catch (YarnException e) { |
| throw new TezException(e); |
| } catch (InterruptedException e) { |
| throw new TezException(e); |
| } |
| } |
| } |
| if (appReport.getDiagnostics() != null) { |
| builder.addAllDiagnostics(Collections.singleton(appReport.getDiagnostics())); |
| } |
| return dagStatus; |
| } |
| |
| private DAGStatus _waitForCompletionWithStatusUpdates(long timeMs, |
| boolean vertexUpdates, |
| @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException, InterruptedException { |
| DAGStatus dagStatus; |
| boolean initPrinted = false; |
| boolean runningPrinted = false; |
| double dagProgress = -1.0; // Print the first one |
| // monitoring |
| Long maxNs = timeMs >= 0 ? (System.nanoTime() + (timeMs * 1000000L)) : null; |
| while (true) { |
| try { |
| dagStatus = getDAGStatus(statusGetOpts, SLEEP_FOR_COMPLETION); |
| } catch (DAGNotRunningException ex) { |
| return null; |
| } |
| if (!initPrinted |
| && (dagStatus.getState() == DAGStatus.State.INITING || dagStatus.getState() == DAGStatus.State.SUBMITTED)) { |
| initPrinted = true; // Print once |
| log("Waiting for DAG to start running"); |
| } |
| if (dagStatus.getState() == DAGStatus.State.RUNNING |
| || dagStatus.getState() == DAGStatus.State.SUCCEEDED |
| || dagStatus.getState() == DAGStatus.State.FAILED |
| || dagStatus.getState() == DAGStatus.State.KILLED |
| || dagStatus.getState() == DAGStatus.State.ERROR) { |
| break; |
| } |
| if (maxNs != null && System.nanoTime() > maxNs) { |
| return null; |
| } |
| }// End of while(true) |
| |
| Set<String> vertexNames = Collections.emptySet(); |
| while (!dagStatus.isCompleted()) { |
| if (!runningPrinted) { |
| log("DAG initialized: CurrentState=Running"); |
| runningPrinted = true; |
| } |
| if (vertexUpdates && vertexNames.isEmpty()) { |
| vertexNames = getDAGStatus(statusGetOpts).getVertexProgress().keySet(); |
| } |
| dagProgress = monitorProgress(vertexNames, dagProgress, null, dagStatus); |
| try { |
| dagStatus = getDAGStatus(statusGetOpts, SLEEP_FOR_COMPLETION); |
| } catch (DAGNotRunningException ex) { |
| return null; |
| } |
| if (maxNs != null && System.nanoTime() > maxNs) { |
| return null; |
| } |
| }// end of while |
| // Always print the last status irrespective of progress change |
| monitorProgress(vertexNames, -1.0, statusGetOpts, dagStatus); |
| log("DAG completed. " + "FinalState=" + dagStatus.getState()); |
| return dagStatus; |
| } |
| |
| private double monitorProgress(Set<String> vertexNames, double prevDagProgress, |
| Set<StatusGetOpts> opts, DAGStatus dagStatus) throws IOException, TezException { |
| Progress progress = dagStatus.getDAGProgress(); |
| double dagProgress = prevDagProgress; |
| if (progress != null) { |
| dagProgress = getProgress(progress); |
| boolean progressChanged = dagProgress > prevDagProgress; |
| long currentTimeMillis = System.currentTimeMillis(); |
| long timeSinceLastPrintStatus = currentTimeMillis - lastPrintStatusTimeMillis; |
| boolean printIntervalExpired = timeSinceLastPrintStatus > PRINT_STATUS_INTERVAL_MILLIS; |
| if (progressChanged || printIntervalExpired) { |
| lastPrintStatusTimeMillis = currentTimeMillis; |
| printDAGStatus(vertexNames, opts, dagStatus, progress); |
| } |
| } |
| |
| return dagProgress; |
| } |
| |
| private void printDAGStatus(Set<String> vertexNames, Set<StatusGetOpts> opts, |
| DAGStatus dagStatus, Progress dagProgress) throws IOException, TezException { |
| double vProgressFloat = 0.0f; |
| log("DAG: State: " + dagStatus.getState() + " Progress: " |
| + formatter.format(getProgress(dagProgress)) + " " + dagProgress); |
| boolean displayCounter = opts != null && opts.contains(StatusGetOpts.GET_COUNTERS); |
| if (displayCounter) { |
| TezCounters counters = dagStatus.getDAGCounters(); |
| if (counters != null) { |
| log("DAG Counters:\n" + counters); |
| } |
| } |
| for (String vertex : vertexNames) { |
| VertexStatus vStatus = getVertexStatus(vertex, opts); |
| if (vStatus == null) { |
| log("Could not retrieve status for vertex: " + vertex); |
| continue; |
| } |
| Progress vProgress = vStatus.getProgress(); |
| if (vProgress != null) { |
| vProgressFloat = 0.0f; |
| if (vProgress.getTotalTaskCount() == 0) { |
| vProgressFloat = 1.0f; |
| } else if (vProgress.getTotalTaskCount() > 0) { |
| vProgressFloat = getProgress(vProgress); |
| } |
| log("\tVertexStatus:" + " VertexName: " + vertex + " Progress: " |
| + formatter.format(vProgressFloat) + " " + vProgress); |
| } |
| if (displayCounter) { |
| TezCounters counters = vStatus.getVertexCounters(); |
| if (counters != null) { |
| log("Vertex Counters for " + vertex + ":\n" + counters); |
| } |
| } |
| } // end of for loop |
| } |
| |
| private void checkAndSetDagCompletionStatus() { |
| ApplicationReport appReport = realClient.getApplicationReportInternal(); |
| if (appReport != null) { |
| final YarnApplicationState appState = appReport.getYarnApplicationState(); |
| if (appState == YarnApplicationState.FINISHED || appState == YarnApplicationState.FAILED || |
| appState == YarnApplicationState.KILLED) { |
| dagCompleted = true; |
| } |
| } |
| } |
| |
| private void switchToTimelineClient() throws IOException, TezException { |
| realClient.close(); |
| realClient = new DAGClientTimelineImpl(appId, dagId, conf, frameworkClient, |
| (int) (2 * PRINT_STATUS_INTERVAL_MILLIS)); |
| LOG.debug("dag completed switching to DAGClientTimelineImpl"); |
| } |
| |
| @VisibleForTesting |
| public DAGClientInternal getRealClient() { |
| return realClient; |
| } |
| |
| @Override |
| public String getWebUIAddress() throws IOException, TezException { |
| return realClient.getWebUIAddress(); |
| } |
| |
| private double getProgress(Progress progress) { |
| return (progress.getTotalTaskCount() == 0 ? 0.0 : (double) (progress.getSucceededTaskCount()) |
| / progress.getTotalTaskCount()); |
| } |
| |
| private void log(String message) { |
| LOG.info(message); |
| } |
| } |