blob: 80ef5ad4bd55264349c7f7968e7bc9e69aa1ab92 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.Progress;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import com.google.common.collect.Maps;
/**
* Wrapper class that encapsulates Tez DAG. This class mediates between Tez DAGs
* and JobControl.
*/
public class TezJob implements Runnable {
private static final Log log = LogFactory.getLog(TezJob.class);
private TezConfiguration conf;
private EnumSet<StatusGetOpts> statusGetOpts;
private Map<String, LocalResource> requestAMResources;
private ApplicationId appId;
private DAG dag;
private DAGClient dagClient;
private DAGStatus dagStatus;
private TezClient tezClient;
private boolean reuseSession;
private TezCounters dagCounters;
// Timer for DAG status reporter
private Timer timer;
private TezJobConfig tezJobConf;
private TezPigScriptStats pigStats;
public TezJob(TezConfiguration conf, DAG dag,
Map<String, LocalResource> requestAMResources,
TezOperPlan tezPlan) throws IOException {
this.conf = conf;
this.dag = dag;
this.requestAMResources = requestAMResources;
this.reuseSession = conf.getBoolean(PigConfiguration.PIG_TEZ_SESSION_REUSE, true);
this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
tezJobConf = new TezJobConfig(tezPlan);
}
static class TezJobConfig {
private int estimatedTotalParallelism = -1;
private int maxOutputsinSingleVertex;
private int totalVertices = 0;
public TezJobConfig(TezOperPlan tezPlan) throws VisitorException {
this.estimatedTotalParallelism = tezPlan.getEstimatedTotalParallelism();
MaxOutputsFinder finder = new MaxOutputsFinder(tezPlan);
finder.visit();
this.maxOutputsinSingleVertex = finder.getMaxOutputsinSingleVertex();
this.totalVertices = finder.getTotalVertices();
}
public int getEstimatedTotalParallelism() {
return estimatedTotalParallelism;
}
public int getMaxOutputsinSingleVertex() {
return maxOutputsinSingleVertex;
}
public int getTotalVertices() {
return totalVertices;
}
}
private static class MaxOutputsFinder extends TezOpPlanVisitor {
private int maxOutputsinSingleVertex = 1;
private int totalVertices = 0;
public MaxOutputsFinder(TezOperPlan plan) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
}
public int getMaxOutputsinSingleVertex() {
return maxOutputsinSingleVertex;
}
public int getTotalVertices() {
return totalVertices;
}
@Override
public void visitTezOp(TezOperator tezOperator) throws VisitorException {
if (!tezOperator.isVertexGroup()) {
totalVertices++;
int outputs = tezOperator.outEdges.keySet().size();
maxOutputsinSingleVertex = maxOutputsinSingleVertex > outputs ? maxOutputsinSingleVertex : outputs;
}
}
}
public DAG getDAG() {
return dag;
}
public String getName() {
return dag.getName();
}
public Configuration getConfiguration() {
return conf;
}
public ApplicationId getApplicationId() {
return appId;
}
public DAGStatus getDAGStatus() {
return dagStatus;
}
public TezCounters getDAGCounters() {
return dagCounters;
}
public float getDAGProgress() {
Progress p = dagStatus.getDAGProgress();
return p == null ? 0 : (float)p.getSucceededTaskCount() / (float)p.getTotalTaskCount();
}
public Map<String, Float> getVertexProgress() {
Map<String, Float> vertexProgress = Maps.newHashMap();
for (Map.Entry<String, Progress> entry : dagStatus.getVertexProgress().entrySet()) {
Progress p = entry.getValue();
float progress = (float)p.getSucceededTaskCount() / (float)p.getTotalTaskCount();
vertexProgress.put(entry.getKey(), progress);
}
return vertexProgress;
}
public VertexStatus getVertexStatus(String vertexName) {
VertexStatus vs = null;
try {
vs = dagClient.getVertexStatus(vertexName, statusGetOpts);
} catch (Exception e) {
// Don't fail the job even if vertex status couldn't
// be retrieved.
log.warn("Cannot retrieve status for vertex " + vertexName, e);
}
return vs;
}
public void setPigStats(TezPigScriptStats pigStats) {
this.pigStats = pigStats;
}
@Override
public void run() {
UDFContext udfContext = UDFContext.getUDFContext();
try {
tezClient = TezSessionManager.getClient(conf, requestAMResources,
dag.getCredentials(), tezJobConf);
log.info("Submitting DAG " + dag.getName());
dagClient = tezClient.submitDAG(dag);
appId = tezClient.getAppMasterApplicationId();
log.info("Submitted DAG " + dag.getName() + ". Application id: " + appId);
} catch (Exception e) {
if (tezClient != null) {
log.error("Cannot submit DAG - Application id: " + tezClient.getAppMasterApplicationId(), e);
} else {
log.error("Cannot submit DAG", e);
}
return;
}
timer = new Timer();
timer.schedule(new DAGStatusReporter(), 1000, conf.getLong(
PigConfiguration.PIG_TEZ_DAG_STATUS_REPORT_INTERVAL, 20) * 1000);
while (true) {
try {
dagStatus = dagClient.getDAGStatus(null);
} catch (Exception e) {
log.info("Cannot retrieve DAG status", e);
break;
}
if (dagStatus.isCompleted()) {
try {
dagStatus = dagClient.getDAGStatus(statusGetOpts);
} catch (Exception e) {
log.warn("Failed to retrieve DAG counters", e);
}
// For tez_local mode where PigProcessor destroys all UDFContext
UDFContext.setUdfContext(udfContext);
log.info("DAG Status: " + dagStatus);
dagCounters = dagStatus.getDAGCounters();
TezSessionManager.freeSession(tezClient);
try {
pigStats.accumulateStats(this);
} catch (Exception e) {
log.warn("Exception while gathering stats", e);
}
try {
if (!reuseSession) {
TezSessionManager.stopSession(tezClient);
}
tezClient = null;
dagClient = null;
} catch (Exception e) {
log.info("Cannot stop Tez session", e);
}
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Do nothing
}
}
timer.cancel();
}
private class DAGStatusReporter extends TimerTask {
private String prevDAGStatus;
@Override
public void run() {
if (dagStatus == null) return;
String currDAGStatus = dagStatus.toString();
if (!currDAGStatus.equals(prevDAGStatus)) {
log.info("DAG Status: " + currDAGStatus);
prevDAGStatus = currDAGStatus;
}
}
}
public void killJob() throws IOException {
try {
if (dagClient != null) {
dagClient.tryKillDAG();
}
if (tezClient != null) {
tezClient.stop();
}
} catch (TezException e) {
throw new IOException("Cannot kill DAG - Application Id: " + appId, e);
}
}
public String getDiagnostics() {
try {
if (dagClient != null && dagStatus == null) {
dagStatus = dagClient.getDAGStatus(null);
}
if (dagStatus != null) {
return StringUtils.join(dagStatus.getDiagnostics(), "\n");
}
} catch (Exception e) {
//Ignore
}
return "";
}
}