blob: aec5415f24f009143da3d6d661192305b77d967f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.log4j.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.ambari.eventdb.model.WorkflowContext;
import org.apache.ambari.eventdb.model.WorkflowDag;
import org.apache.ambari.eventdb.model.WorkflowDag.WorkflowDagEntry;
import org.apache.ambari.log4j.common.LogStoreUpdateProvider;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.tools.rumen.HistoryEvent;
import org.apache.hadoop.tools.rumen.JhCounter;
import org.apache.hadoop.tools.rumen.JhCounterGroup;
import org.apache.hadoop.tools.rumen.JhCounters;
import org.apache.hadoop.tools.rumen.JobFinishedEvent;
import org.apache.hadoop.tools.rumen.JobInfoChangeEvent;
import org.apache.hadoop.tools.rumen.JobInitedEvent;
import org.apache.hadoop.tools.rumen.JobStatusChangedEvent;
import org.apache.hadoop.tools.rumen.JobSubmittedEvent;
import org.apache.hadoop.tools.rumen.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.tools.rumen.MapAttemptFinishedEvent;
import org.apache.hadoop.tools.rumen.ReduceAttemptFinishedEvent;
import org.apache.hadoop.tools.rumen.TaskAttemptFinishedEvent;
import org.apache.hadoop.tools.rumen.TaskAttemptStartedEvent;
import org.apache.hadoop.tools.rumen.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.tools.rumen.TaskFailedEvent;
import org.apache.hadoop.tools.rumen.TaskFinishedEvent;
import org.apache.hadoop.tools.rumen.TaskStartedEvent;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.spi.LoggingEvent;
import org.codehaus.jackson.map.ObjectMapper;
public class MapReduceJobHistoryUpdater implements LogStoreUpdateProvider {
private static final Log LOG =
LogFactory.getLog(MapReduceJobHistoryUpdater.class);
private Connection connection;
private static final String WORKFLOW_TABLE = "workflow";
private static final String JOB_TABLE = "job";
private static final String TASK_TABLE = "task";
private static final String TASKATTEMPT_TABLE = "taskAttempt";
private PreparedStatement workflowPS = null;
private PreparedStatement workflowSelectPS = null;
private PreparedStatement workflowUpdateTimePS = null;
private PreparedStatement workflowUpdateNumCompletedPS = null;
private Map<Class<? extends HistoryEvent>, PreparedStatement> entitySqlMap =
new HashMap<Class<? extends HistoryEvent>, PreparedStatement>();
@Override
public void init(Connection connection) throws IOException {
this.connection = connection;
try {
initializePreparedStatements();
} catch (SQLException sqle) {
throw new IOException(sqle);
}
}
private void initializePreparedStatements() throws SQLException {
initializeJobPreparedStatements();
initializeTaskPreparedStatements();
initializeTaskAttemptPreparedStatements();
}
private PreparedStatement jobEndUpdate;
private void initializeJobPreparedStatements() throws SQLException {
/**
* Job events
*/
// JobSubmittedEvent
PreparedStatement jobSubmittedPrepStmnt =
connection.prepareStatement(
"INSERT INTO " +
JOB_TABLE +
" (" +
"jobId, " +
"jobName, " +
"userName, " +
"confPath, " +
"queue, " +
"submitTime, " +
"workflowId, " +
"workflowEntityName " +
") " +
"VALUES" +
" (?, ?, ?, ?, ?, ?, ?, ?)"
);
entitySqlMap.put(JobSubmittedEvent.class, jobSubmittedPrepStmnt);
workflowSelectPS =
connection.prepareStatement(
"SELECT workflowContext FROM " + WORKFLOW_TABLE + " where workflowId = ?"
);
workflowPS =
connection.prepareStatement(
"INSERT INTO " +
WORKFLOW_TABLE +
" (" +
"workflowId, " +
"workflowName, " +
"workflowContext, " +
"userName, " +
"startTime, " +
"lastUpdateTime, " +
"duration, " +
"numJobsTotal, " +
"numJobsCompleted" +
") " +
"VALUES" +
" (?, ?, ?, ?, ?, ?, 0, ?, 0)"
);
workflowUpdateTimePS =
connection.prepareStatement(
"UPDATE " +
WORKFLOW_TABLE +
" SET " +
"workflowContext = ?, " +
"numJobsTotal = ?, " +
"lastUpdateTime = ?, " +
"duration = ? - (SELECT startTime FROM " +
WORKFLOW_TABLE +
" WHERE workflowId = ?) " +
"WHERE workflowId = ?"
);
workflowUpdateNumCompletedPS =
connection.prepareStatement(
"UPDATE " +
WORKFLOW_TABLE +
" SET " +
"lastUpdateTime = ?, " +
"duration = ? - (SELECT startTime FROM " +
WORKFLOW_TABLE +
" WHERE workflowId = selectid), " +
"numJobsCompleted = rows, " +
"inputBytes = input, " +
"outputBytes = output " +
"FROM (SELECT count(*) as rows, sum(inputBytes) as input, " +
"sum(outputBytes) as output, workflowId as selectid FROM " +
JOB_TABLE +
" WHERE workflowId = (SELECT workflowId FROM " +
JOB_TABLE +
" WHERE jobId = ?) AND status = 'SUCCESS' " +
"GROUP BY workflowId) as jobsummary " +
"WHERE workflowId = selectid"
);
// JobFinishedEvent
PreparedStatement jobFinishedPrepStmnt =
connection.prepareStatement(
"UPDATE " +
JOB_TABLE +
" SET " +
"finishTime = ?, " +
"finishedMaps = ?, " +
"finishedReduces= ?, " +
"failedMaps = ?, " +
"failedReduces = ?, " +
"inputBytes = ?, " +
"outputBytes = ? " +
"WHERE " +
"jobId = ?"
);
entitySqlMap.put(JobFinishedEvent.class, jobFinishedPrepStmnt);
// JobInitedEvent
PreparedStatement jobInitedPrepStmnt =
connection.prepareStatement(
"UPDATE " +
JOB_TABLE +
" SET " +
"launchTime = ?, " +
"maps = ?, " +
"reduces = ?, " +
"status = ? "+
"WHERE " +
"jobId = ?"
);
entitySqlMap.put(JobInitedEvent.class, jobInitedPrepStmnt);
// JobStatusChangedEvent
PreparedStatement jobStatusChangedPrepStmnt =
connection.prepareStatement(
"UPDATE " +
JOB_TABLE +
" SET " +
"status = ? "+
"WHERE " +
"jobId = ?"
);
entitySqlMap.put(JobStatusChangedEvent.class, jobStatusChangedPrepStmnt);
// JobInfoChangedEvent
PreparedStatement jobInfoChangedPrepStmnt =
connection.prepareStatement(
"UPDATE " +
JOB_TABLE +
" SET " +
"submitTime = ?, " +
"launchTime = ? " +
"WHERE " +
"jobId = ?"
);
entitySqlMap.put(JobInfoChangeEvent.class, jobInfoChangedPrepStmnt);
// JobUnsuccessfulCompletionEvent
PreparedStatement jobUnsuccessfulPrepStmnt =
connection.prepareStatement(
"UPDATE " +
JOB_TABLE +
" SET " +
"finishTime = ?, " +
"finishedMaps = ?, " +
"finishedReduces = ?, " +
"status = ? " +
"WHERE " +
"jobId = ?"
);
entitySqlMap.put(
JobUnsuccessfulCompletionEvent.class, jobUnsuccessfulPrepStmnt);
// Job update at the end
jobEndUpdate =
connection.prepareStatement(
"UPDATE " +
JOB_TABLE +
" SET " +
" mapsRuntime = (" +
"SELECT " +
"SUM(" +
TASKATTEMPT_TABLE + ".finishTime" + " - " +
TASKATTEMPT_TABLE + ".startTime" +
")" +
" FROM " +
TASKATTEMPT_TABLE +
" WHERE " +
TASKATTEMPT_TABLE + ".jobId = " + JOB_TABLE + ".jobId " +
" AND " +
TASKATTEMPT_TABLE + ".taskType = ?)" +
", " +
" reducesRuntime = (" +
"SELECT SUM(" +
TASKATTEMPT_TABLE + ".finishTime" + " - " +
TASKATTEMPT_TABLE + ".startTime" +
")" +
" FROM " +
TASKATTEMPT_TABLE +
" WHERE " +
TASKATTEMPT_TABLE + ".jobId = " + JOB_TABLE + ".jobId " +
" AND " +
TASKATTEMPT_TABLE + ".taskType = ?) " +
" WHERE " +
"jobId = ?"
);
}
private void initializeTaskPreparedStatements() throws SQLException {
/**
* Task events
*/
// TaskStartedEvent
PreparedStatement taskStartedPrepStmnt =
connection.prepareStatement(
"INSERT INTO " +
TASK_TABLE +
" (" +
"jobId, " +
"taskType, " +
"splits, " +
"startTime, " +
"taskId" +
") " +
"VALUES (?, ?, ?, ?, ?)"
);
entitySqlMap.put(TaskStartedEvent.class, taskStartedPrepStmnt);
// TaskFinishedEvent
PreparedStatement taskFinishedPrepStmnt =
connection.prepareStatement(
"UPDATE " +
TASK_TABLE +
" SET " +
"jobId = ?, " +
"taskType = ?, " +
"status = ?, " +
"finishTime = ? " +
" WHERE " +
"taskId = ?"
);
entitySqlMap.put(TaskFinishedEvent.class, taskFinishedPrepStmnt);
// TaskFailedEvent
PreparedStatement taskFailedPrepStmnt =
connection.prepareStatement(
"UPDATE " +
TASK_TABLE +
" SET " +
"jobId = ?, " +
"taskType = ?, " +
"status = ?, " +
"finishTime = ?, " +
"error = ?, " +
"failedAttempt = ? " +
"WHERE " +
"taskId = ?"
);
entitySqlMap.put(TaskFailedEvent.class, taskFailedPrepStmnt);
}
private void initializeTaskAttemptPreparedStatements() throws SQLException {
/**
* TaskAttempt events
*/
// TaskAttemptStartedEvent
PreparedStatement taskAttemptStartedPrepStmnt =
connection.prepareStatement(
"INSERT INTO " +
TASKATTEMPT_TABLE +
" (" +
"jobId, " +
"taskId, " +
"taskType, " +
"startTime, " +
"taskTracker, " +
"locality, " +
"avataar, " +
"taskAttemptId" +
") " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
);
entitySqlMap.put(
TaskAttemptStartedEvent.class, taskAttemptStartedPrepStmnt);
// TaskAttemptFinishedEvent
PreparedStatement taskAttemptFinishedPrepStmnt =
connection.prepareStatement(
"UPDATE " +
TASKATTEMPT_TABLE +
" SET " +
"jobId = ?, " +
"taskId = ?, " +
"taskType = ?, " +
"finishTime = ?, " +
"status = ?, " +
"taskTracker = ? " +
" WHERE " +
"taskAttemptId = ?"
);
entitySqlMap.put(
TaskAttemptFinishedEvent.class, taskAttemptFinishedPrepStmnt);
// TaskAttemptUnsuccessfulEvent
PreparedStatement taskAttemptUnsuccessfulPrepStmnt =
connection.prepareStatement(
"UPDATE " +
TASKATTEMPT_TABLE +
" SET " +
"jobId = ?, " +
"taskId = ?, " +
"taskType = ?, " +
"finishTime = ?, " +
"status = ?, " +
"taskTracker = ?, " +
"error = ? " +
" WHERE " +
"taskAttemptId = ?"
);
entitySqlMap.put(
TaskAttemptUnsuccessfulCompletionEvent.class,
taskAttemptUnsuccessfulPrepStmnt);
// MapAttemptFinishedEvent
PreparedStatement mapAttemptFinishedPrepStmnt =
connection.prepareStatement(
"UPDATE " +
TASKATTEMPT_TABLE +
" SET " +
"jobId = ?, " +
"taskId = ?, " +
"taskType = ?, " +
"mapFinishTime = ?, " +
"finishTime = ?, " +
"inputBytes = ?, " +
"outputBytes = ?, " +
"status = ?, " +
"taskTracker = ? " +
" WHERE " +
"taskAttemptId = ?"
);
entitySqlMap.put(
MapAttemptFinishedEvent.class, mapAttemptFinishedPrepStmnt);
// ReduceAttemptFinishedEvent
PreparedStatement reduceAttemptFinishedPrepStmnt =
connection.prepareStatement(
"UPDATE " +
TASKATTEMPT_TABLE +
" SET " +
"jobId = ?, " +
"taskId = ?, " +
"taskType = ?, " +
"shuffleFinishTime = ?, " +
"sortFinishTime = ?, " +
"finishTime = ?, " +
"inputBytes = ?, " +
"outputBytes = ?, " +
"status = ?, " +
"taskTracker = ? " +
" WHERE " +
"taskAttemptId = ?"
);
entitySqlMap.put(
ReduceAttemptFinishedEvent.class, reduceAttemptFinishedPrepStmnt);
}
private void doUpdates(LoggingEvent originalEvent,
Object parsedEvent) throws SQLException {
Class<?> eventClass = parsedEvent.getClass();
PreparedStatement entityPS = entitySqlMap.get(eventClass);
if (entityPS == null) {
LOG.debug("No prepared statement for " + eventClass);
return;
}
if (eventClass == JobSubmittedEvent.class) {
processJobSubmittedEvent(entityPS, workflowSelectPS, workflowPS,
workflowUpdateTimePS, originalEvent,
(JobSubmittedEvent)parsedEvent);
} else if (eventClass == JobFinishedEvent.class) {
processJobFinishedEvent(entityPS, workflowUpdateNumCompletedPS,
originalEvent, (JobFinishedEvent)parsedEvent);
} else if (eventClass == JobInitedEvent.class){
processJobInitedEvent(entityPS,
originalEvent, (JobInitedEvent)parsedEvent);
} else if (eventClass == JobStatusChangedEvent.class) {
processJobStatusChangedEvent(entityPS,
originalEvent, (JobStatusChangedEvent)parsedEvent);
} else if (eventClass == JobInfoChangeEvent.class) {
processJobInfoChangeEvent(entityPS,
originalEvent, (JobInfoChangeEvent)parsedEvent);
} else if (eventClass == JobUnsuccessfulCompletionEvent.class) {
processJobUnsuccessfulEvent(entityPS,
originalEvent, (JobUnsuccessfulCompletionEvent)parsedEvent);
} else if (eventClass == TaskStartedEvent.class) {
processTaskStartedEvent(entityPS,
originalEvent, (TaskStartedEvent)parsedEvent);
} else if (eventClass == TaskFinishedEvent.class) {
processTaskFinishedEvent(entityPS,
originalEvent, (TaskFinishedEvent)parsedEvent);
} else if (eventClass == TaskFailedEvent.class) {
processTaskFailedEvent(entityPS,
originalEvent, (TaskFailedEvent)parsedEvent);
} else if (eventClass == TaskAttemptStartedEvent.class) {
processTaskAttemptStartedEvent(entityPS,
originalEvent, (TaskAttemptStartedEvent)parsedEvent);
} else if (eventClass == TaskAttemptFinishedEvent.class) {
processTaskAttemptFinishedEvent(entityPS,
originalEvent, (TaskAttemptFinishedEvent)parsedEvent);
} else if (eventClass == TaskAttemptUnsuccessfulCompletionEvent.class) {
processTaskAttemptUnsuccessfulEvent(entityPS,
originalEvent, (TaskAttemptUnsuccessfulCompletionEvent)parsedEvent);
} else if (eventClass == MapAttemptFinishedEvent.class) {
processMapAttemptFinishedEvent(entityPS,
originalEvent, (MapAttemptFinishedEvent)parsedEvent);
} else if (eventClass == ReduceAttemptFinishedEvent.class) {
processReduceAttemptFinishedEvent(entityPS,
originalEvent, (ReduceAttemptFinishedEvent)parsedEvent);
}
}
private void updateJobStatsAtFinish(String jobId) {
try {
jobEndUpdate.setString(1, "MAP");
jobEndUpdate.setString(2, "REDUCE");
jobEndUpdate.setString(3, jobId);
jobEndUpdate.executeUpdate();
} catch (SQLException sqle) {
LOG.info("Failed to update mapsRuntime/reducesRuntime for " + jobId,
sqle);
}
}
private static WorkflowContext generateWorkflowContext(
JobSubmittedEvent historyEvent) {
WorkflowDag wfDag = new WorkflowDag();
WorkflowDagEntry wfDagEntry = new WorkflowDagEntry();
wfDagEntry.setSource("X");
wfDag.addEntry(wfDagEntry);
WorkflowContext wc = new WorkflowContext();
wc.setWorkflowId(historyEvent.getJobId().toString().replace("job_", "mr_"));
wc.setWorkflowName(historyEvent.getJobName());
wc.setWorkflowEntityName("X");
wc.setWorkflowDag(wfDag);
return wc;
}
// this is based on the regex in org.apache.hadoop.tools.rumen.ParsedLine
// except this assumes the format "key"="value" so that both key and value
// are quoted and may contain escaped characters
private static final Pattern adjPattern =
Pattern.compile("\"([^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+)\"" + "=" +
"\"([^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+)\" ");
public static WorkflowContext buildWorkflowContext(JobSubmittedEvent historyEvent) {
String workflowId = historyEvent.getWorkflowId()
.replace("\\", "");
if (workflowId.isEmpty())
return generateWorkflowContext(historyEvent);
String workflowName = historyEvent.getWorkflowName()
.replace("\\", "");
String workflowNodeName = historyEvent.getWorkflowNodeName()
.replace("\\", "");
String workflowAdjacencies = StringUtils.unEscapeString(
historyEvent.getWorkflowAdjacencies(),
StringUtils.ESCAPE_CHAR, new char[] {'"', '=', '.'});
WorkflowContext context = new WorkflowContext();
context.setWorkflowId(workflowId);
context.setWorkflowName(workflowName);
context.setWorkflowEntityName(workflowNodeName);
WorkflowDag dag = new WorkflowDag();
Matcher matcher = adjPattern.matcher(workflowAdjacencies);
while(matcher.find()){
WorkflowDagEntry dagEntry = new WorkflowDagEntry();
dagEntry.setSource(matcher.group(1).replace("\\", ""));
String[] values = StringUtils.getStrings(
matcher.group(2).replace("\\", ""));
if (values != null) {
for (String target : values) {
dagEntry.addTarget(target);
}
}
dag.addEntry(dagEntry);
}
if (dag.getEntries().isEmpty()) {
WorkflowDagEntry wfDagEntry = new WorkflowDagEntry();
wfDagEntry.setSource(workflowNodeName);
dag.addEntry(wfDagEntry);
}
context.setWorkflowDag(dag);
return context;
}
public static void mergeEntries(Map<String, Set<String>> edges, List<WorkflowDagEntry> entries) {
if (entries == null)
return;
for (WorkflowDagEntry entry : entries) {
if (!edges.containsKey(entry.getSource()))
edges.put(entry.getSource(), new TreeSet<String>());
Set<String> targets = edges.get(entry.getSource());
targets.addAll(entry.getTargets());
}
}
public static WorkflowDag constructMergedDag(WorkflowContext workflowContext, WorkflowContext existingWorkflowContext) {
Map<String, Set<String>> edges = new TreeMap<String, Set<String>>();
if (existingWorkflowContext.getWorkflowDag() != null)
mergeEntries(edges, existingWorkflowContext.getWorkflowDag().getEntries());
if (workflowContext.getWorkflowDag() != null)
mergeEntries(edges, workflowContext.getWorkflowDag().getEntries());
WorkflowDag mergedDag = new WorkflowDag();
for (Entry<String,Set<String>> edge : edges.entrySet()) {
WorkflowDagEntry entry = new WorkflowDagEntry();
entry.setSource(edge.getKey());
entry.getTargets().addAll(edge.getValue());
mergedDag.addEntry(entry);
}
return mergedDag;
}
private static WorkflowContext getSanitizedWorkflow(WorkflowContext workflowContext, WorkflowContext existingWorkflowContext) {
WorkflowContext sanitizedWC = new WorkflowContext();
if (existingWorkflowContext == null) {
sanitizedWC.setWorkflowDag(workflowContext.getWorkflowDag());
sanitizedWC.setParentWorkflowContext(workflowContext.getParentWorkflowContext());
} else {
sanitizedWC.setWorkflowDag(constructMergedDag(existingWorkflowContext, workflowContext));
sanitizedWC.setParentWorkflowContext(existingWorkflowContext.getParentWorkflowContext());
}
return sanitizedWC;
}
private static String getWorkflowString(WorkflowContext sanitizedWC) {
String sanitizedWCString = null;
try {
ObjectMapper om = new ObjectMapper();
sanitizedWCString = om.writeValueAsString(sanitizedWC);
} catch (IOException e) {
e.printStackTrace();
sanitizedWCString = "";
}
return sanitizedWCString;
}
private void processJobSubmittedEvent(
PreparedStatement jobPS,
PreparedStatement workflowSelectPS, PreparedStatement workflowPS,
PreparedStatement workflowUpdateTimePS, LoggingEvent logEvent,
JobSubmittedEvent historyEvent) {
try {
String jobId = historyEvent.getJobId().toString();
jobPS.setString(1, jobId);
jobPS.setString(2, historyEvent.getJobName());
jobPS.setString(3, historyEvent.getUserName());
jobPS.setString(4, historyEvent.getJobConfPath());
jobPS.setString(5, historyEvent.getJobQueueName());
jobPS.setLong(6, historyEvent.getSubmitTime());
WorkflowContext workflowContext = buildWorkflowContext(historyEvent);
// Get workflow information
boolean insertWorkflow = false;
String existingContextString = null;
ResultSet rs = null;
try {
workflowSelectPS.setString(1, workflowContext.getWorkflowId());
workflowSelectPS.execute();
rs = workflowSelectPS.getResultSet();
if (rs.next()) {
existingContextString = rs.getString(1);
} else {
insertWorkflow = true;
}
} catch (SQLException sqle) {
LOG.warn("workflow select failed with: ", sqle);
insertWorkflow = false;
} finally {
try {
if (rs != null)
rs.close();
} catch (SQLException e) {
LOG.error("Exception while closing ResultSet", e);
}
}
// Insert workflow
if (insertWorkflow) {
workflowPS.setString(1, workflowContext.getWorkflowId());
workflowPS.setString(2, workflowContext.getWorkflowName());
workflowPS.setString(3, getWorkflowString(getSanitizedWorkflow(workflowContext, null)));
workflowPS.setString(4, historyEvent.getUserName());
workflowPS.setLong(5, historyEvent.getSubmitTime());
workflowPS.setLong(6, historyEvent.getSubmitTime());
workflowPS.setLong(7, workflowContext.getWorkflowDag().size());
workflowPS.executeUpdate();
LOG.debug("Successfully inserted workflowId = " +
workflowContext.getWorkflowId());
} else {
ObjectMapper om = new ObjectMapper();
WorkflowContext existingWorkflowContext = null;
try {
if (existingContextString != null)
existingWorkflowContext = om.readValue(existingContextString.getBytes(), WorkflowContext.class);
} catch (IOException e) {
LOG.warn("Couldn't read existing workflow context for " + workflowContext.getWorkflowId(), e);
}
WorkflowContext sanitizedWC = getSanitizedWorkflow(workflowContext, existingWorkflowContext);
workflowUpdateTimePS.setString(1, getWorkflowString(sanitizedWC));
workflowUpdateTimePS.setLong(2, sanitizedWC.getWorkflowDag().size());
workflowUpdateTimePS.setLong(3, historyEvent.getSubmitTime());
workflowUpdateTimePS.setLong(4, historyEvent.getSubmitTime());
workflowUpdateTimePS.setString(5, workflowContext.getWorkflowId());
workflowUpdateTimePS.setString(6, workflowContext.getWorkflowId());
workflowUpdateTimePS.executeUpdate();
LOG.debug("Successfully updated workflowId = " +
workflowContext.getWorkflowId());
}
// Insert job
jobPS.setString(7, workflowContext.getWorkflowId());
jobPS.setString(8, workflowContext.getWorkflowEntityName());
jobPS.executeUpdate();
LOG.debug("Successfully inserted job = " + jobId +
" and workflowId = " + workflowContext.getWorkflowId());
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() + " for job " +
historyEvent.getJobId() + " into " + JOB_TABLE, sqle);
} catch (Exception e) {
LOG.info("Failed to store " + historyEvent.getEventType() + " for job " +
historyEvent.getJobId() + " into " + JOB_TABLE, e);
}
}
private void processJobFinishedEvent(
PreparedStatement entityPS,
PreparedStatement workflowUpdateNumCompletedPS,
LoggingEvent logEvent, JobFinishedEvent historyEvent) {
Counters counters = historyEvent.getMapCounters();
long inputBytes = 0;
if (counters != null) {
for (CounterGroup group : counters) {
for (Counter counter : group) {
if (counter.getName().equals("HDFS_BYTES_READ"))
inputBytes += counter.getValue();
}
}
}
if (historyEvent.getFinishedReduces() != 0)
counters = historyEvent.getReduceCounters();
long outputBytes = 0;
if (counters != null) {
for (CounterGroup group : counters) {
for (Counter counter : group) {
if (counter.getName().equals("HDFS_BYTES_WRITTEN"))
outputBytes += counter.getValue();
}
}
}
try {
entityPS.setLong(1, historyEvent.getFinishTime());
entityPS.setInt(2, historyEvent.getFinishedMaps());
entityPS.setInt(3, historyEvent.getFinishedReduces());
entityPS.setInt(4, historyEvent.getFailedMaps());
entityPS.setInt(5, historyEvent.getFailedReduces());
entityPS.setLong(6, inputBytes);
entityPS.setLong(7, outputBytes);
entityPS.setString(8, historyEvent.getJobid().toString());
entityPS.executeUpdate();
// job finished events always have success status
workflowUpdateNumCompletedPS.setLong(1, historyEvent.getFinishTime());
workflowUpdateNumCompletedPS.setLong(2, historyEvent.getFinishTime());
workflowUpdateNumCompletedPS.setString(3, historyEvent.getJobid().toString());
workflowUpdateNumCompletedPS.executeUpdate();
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() + " for job " +
historyEvent.getJobid() + " into " + JOB_TABLE, sqle);
}
updateJobStatsAtFinish(historyEvent.getJobid().toString());
}
private void processJobInitedEvent(
PreparedStatement entityPS,
LoggingEvent logEvent, JobInitedEvent historyEvent) {
try {
entityPS.setLong(1, historyEvent.getLaunchTime());
entityPS.setInt(2, historyEvent.getTotalMaps());
entityPS.setInt(3, historyEvent.getTotalReduces());
entityPS.setString(4, historyEvent.getStatus());
entityPS.setString(5, historyEvent.getJobId().toString());
entityPS.executeUpdate();
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() + " for job " +
historyEvent.getJobId() + " into " + JOB_TABLE, sqle);
}
}
private void processJobStatusChangedEvent(
PreparedStatement entityPS,
LoggingEvent logEvent, JobStatusChangedEvent historyEvent) {
try {
entityPS.setString(1, historyEvent.getStatus());
entityPS.setString(2, historyEvent.getJobId().toString());
entityPS.executeUpdate();
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() + " for job " +
historyEvent.getJobId() + " into " + JOB_TABLE, sqle);
}
}
private void processJobInfoChangeEvent(
PreparedStatement entityPS,
LoggingEvent logEvent, JobInfoChangeEvent historyEvent) {
try {
entityPS.setLong(1, historyEvent.getSubmitTime());
entityPS.setLong(2, historyEvent.getLaunchTime());
entityPS.setString(3, historyEvent.getJobId().toString());
entityPS.executeUpdate();
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() + " for job " +
historyEvent.getJobId() + " into " + JOB_TABLE, sqle);
}
}
private void processJobUnsuccessfulEvent(
PreparedStatement entityPS,
LoggingEvent logEvent, JobUnsuccessfulCompletionEvent historyEvent) {
try {
entityPS.setLong(1, historyEvent.getFinishTime());
entityPS.setLong(2, historyEvent.getFinishedMaps());
entityPS.setLong(3, historyEvent.getFinishedReduces());
entityPS.setString(4, historyEvent.getStatus());
entityPS.setString(5, historyEvent.getJobId().toString());
entityPS.executeUpdate();
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() + " for job " +
historyEvent.getJobId() + " into " + JOB_TABLE, sqle);
}
updateJobStatsAtFinish(historyEvent.getJobId().toString());
}
private void processTaskStartedEvent(PreparedStatement entityPS,
LoggingEvent logEvent, TaskStartedEvent historyEvent) {
try {
entityPS.setString(1,
historyEvent.getTaskId().getJobID().toString());
entityPS.setString(2, historyEvent.getTaskType().toString());
entityPS.setString(3, historyEvent.getSplitLocations());
entityPS.setLong(4, historyEvent.getStartTime());
entityPS.setString(5, historyEvent.getTaskId().toString());
entityPS.executeUpdate();
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() + " for task " +
historyEvent.getTaskId() + " into " + TASK_TABLE, sqle);
}
}
private void processTaskFinishedEvent(
PreparedStatement entityPS,
LoggingEvent logEvent, TaskFinishedEvent historyEvent) {
try {
entityPS.setString(1,
historyEvent.getTaskId().getJobID().toString());
entityPS.setString(2, historyEvent.getTaskType().toString());
entityPS.setString(3, historyEvent.getTaskStatus());
entityPS.setLong(4, historyEvent.getFinishTime());
entityPS.setString(5, historyEvent.getTaskId().toString());
entityPS.executeUpdate();
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() + " for task " +
historyEvent.getTaskId() + " into " + TASK_TABLE, sqle);
}
}
private void processTaskFailedEvent(
PreparedStatement entityPS,
LoggingEvent logEvent, TaskFailedEvent historyEvent) {
try {
entityPS.setString(1,
historyEvent.getTaskId().getJobID().toString());
entityPS.setString(2, historyEvent.getTaskType().toString());
entityPS.setString(3, historyEvent.getTaskStatus());
entityPS.setLong(4, historyEvent.getFinishTime());
entityPS.setString(5, historyEvent.getError());
if (historyEvent.getFailedAttemptID() != null) {
entityPS.setString(6, historyEvent.getFailedAttemptID().toString());
} else {
entityPS.setString(6, "task_na");
}
entityPS.setString(7, historyEvent.getTaskId().toString());
entityPS.executeUpdate();
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() + " for task " +
historyEvent.getTaskId() + " into " + TASK_TABLE, sqle);
}
}
private void processTaskAttemptStartedEvent(
PreparedStatement entityPS,
LoggingEvent logEvent, TaskAttemptStartedEvent historyEvent) {
try {
entityPS.setString(1,
historyEvent.getTaskId().getJobID().toString());
entityPS.setString(2, historyEvent.getTaskId().toString());
entityPS.setString(3, historyEvent.getTaskType().toString());
entityPS.setLong(4, historyEvent.getStartTime());
entityPS.setString(5, historyEvent.getTrackerName());
entityPS.setString(6, historyEvent.getLocality().toString());
entityPS.setString(7, historyEvent.getAvataar().toString());
entityPS.setString(8, historyEvent.getTaskAttemptId().toString());
entityPS.executeUpdate();
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() +
" for taskAttempt " + historyEvent.getTaskAttemptId() +
" into " + TASKATTEMPT_TABLE, sqle);
}
}
private void processTaskAttemptFinishedEvent(
PreparedStatement entityPS,
LoggingEvent logEvent, TaskAttemptFinishedEvent historyEvent) {
if (historyEvent.getTaskType() == TaskType.MAP ||
historyEvent.getTaskType() == TaskType.REDUCE) {
LOG.debug("Ignoring TaskAttemptFinishedEvent for " +
historyEvent.getTaskType());
return;
}
try {
entityPS.setString(1,
historyEvent.getTaskId().getJobID().toString());
entityPS.setString(2, historyEvent.getTaskId().toString());
entityPS.setString(3, historyEvent.getTaskType().toString());
entityPS.setLong(4, historyEvent.getFinishTime());
entityPS.setString(5, historyEvent.getTaskStatus());
entityPS.setString(6, historyEvent.getHostname());
entityPS.setString(7, historyEvent.getAttemptId().toString());
entityPS.executeUpdate();
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() +
" for taskAttempt " + historyEvent.getAttemptId() +
" into " + TASKATTEMPT_TABLE, sqle);
}
}
private void processTaskAttemptUnsuccessfulEvent(
PreparedStatement entityPS,
LoggingEvent logEvent,
TaskAttemptUnsuccessfulCompletionEvent historyEvent) {
try {
entityPS.setString(1,
historyEvent.getTaskId().getJobID().toString());
entityPS.setString(2, historyEvent.getTaskId().toString());
entityPS.setString(3, historyEvent.getTaskType().toString());
entityPS.setLong(4, historyEvent.getFinishTime());
entityPS.setString(5, historyEvent.getTaskStatus());
entityPS.setString(6, historyEvent.getHostname());
entityPS.setString(7, historyEvent.getError());
entityPS.setString(8, historyEvent.getTaskAttemptId().toString());
entityPS.executeUpdate();
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() +
" for taskAttempt " + historyEvent.getTaskAttemptId() +
" into " + TASKATTEMPT_TABLE, sqle);
}
}
private void processMapAttemptFinishedEvent(
PreparedStatement entityPS,
LoggingEvent logEvent, MapAttemptFinishedEvent historyEvent) {
if (historyEvent.getTaskType() != TaskType.MAP) {
LOG.debug("Ignoring MapAttemptFinishedEvent for " +
historyEvent.getTaskType());
return;
}
long[] ioBytes = getInputOutputBytes(historyEvent.getCounters());
try {
entityPS.setString(1,
historyEvent.getTaskId().getJobID().toString());
entityPS.setString(2, historyEvent.getTaskId().toString());
entityPS.setString(3, historyEvent.getTaskType().toString());
entityPS.setLong(4, historyEvent.getMapFinishTime());
entityPS.setLong(5, historyEvent.getFinishTime());
entityPS.setLong(6, ioBytes[0]);
entityPS.setLong(7, ioBytes[1]);
entityPS.setString(8, historyEvent.getTaskStatus());
entityPS.setString(9, historyEvent.getHostname());
entityPS.setString(10, historyEvent.getAttemptId().toString());
entityPS.executeUpdate();
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() +
" for taskAttempt " + historyEvent.getAttemptId() +
" into " + TASKATTEMPT_TABLE, sqle);
}
}
private void processReduceAttemptFinishedEvent(
PreparedStatement entityPS,
LoggingEvent logEvent, ReduceAttemptFinishedEvent historyEvent) {
if (historyEvent.getTaskType() != TaskType.REDUCE) {
LOG.debug("Ignoring ReduceAttemptFinishedEvent for " +
historyEvent.getTaskType());
return;
}
long[] ioBytes = getInputOutputBytes(historyEvent.getCounters());
try {
entityPS.setString(1,
historyEvent.getTaskId().getJobID().toString());
entityPS.setString(2, historyEvent.getTaskId().toString());
entityPS.setString(3, historyEvent.getTaskType().toString());
entityPS.setLong(4, historyEvent.getShuffleFinishTime());
entityPS.setLong(5, historyEvent.getSortFinishTime());
entityPS.setLong(6, historyEvent.getFinishTime());
entityPS.setLong(7, ioBytes[0]);
entityPS.setLong(8, ioBytes[1]);
entityPS.setString(9, historyEvent.getTaskStatus());
entityPS.setString(10, historyEvent.getHostname());
entityPS.setString(11, historyEvent.getAttemptId().toString());
entityPS.executeUpdate();
} catch (SQLException sqle) {
LOG.info("Failed to store " + historyEvent.getEventType() +
" for taskAttempt " + historyEvent.getAttemptId() +
" into " + TASKATTEMPT_TABLE, sqle);
}
}
public static long[] getInputOutputBytes(JhCounters counters) {
long inputBytes = 0;
long outputBytes = 0;
if (counters != null) {
for (JhCounterGroup counterGroup : counters.groups) {
if (counterGroup.name.equals("FileSystemCounters")) {
for (JhCounter counter : counterGroup.counts) {
if (counter.name.equals("HDFS_BYTES_READ") ||
counter.name.equals("FILE_BYTES_READ"))
inputBytes += counter.value;
else if (counter.name.equals("HDFS_BYTES_WRITTEN") ||
counter.name.equals("FILE_BYTES_WRITTEN"))
outputBytes += counter.value;
}
}
}
}
return new long[]{inputBytes, outputBytes};
}
@Override
public void update(LoggingEvent originalEvent, Object parsedEvent)
throws IOException {
try {
doUpdates(originalEvent, parsedEvent);
} catch (SQLException sqle) {
throw new IOException(sqle);
}
}
}