blob: a20e297670d96c597ea56201043fcf0edda98dd7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.action.hadoop;
import java.util.Map;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.json.simple.JSONObject;
/**
* Class to collect the Pig statistics for a Pig action
*
*/
public class OoziePigStats extends ActionStats {
PigStats pigStats = null;
public OoziePigStats(PigStats pigStats) {
this.currentActionType = ActionType.PIG;
this.pigStats = pigStats;
}
/**
* The PigStats API is used to collect the statistics and the result is returned as a JSON String.
*
* @return a JSON string
*/
@SuppressWarnings("unchecked")
@Override
public String toJSON() {
JSONObject pigStatsGroup = new JSONObject();
pigStatsGroup.put("ACTION_TYPE", getCurrentActionType().toString());
// pig summary related counters
pigStatsGroup.put("BYTES_WRITTEN", Long.toString(pigStats.getBytesWritten()));
pigStatsGroup.put("DURATION", Long.toString(pigStats.getDuration()));
pigStatsGroup.put("ERROR_CODE", Long.toString(pigStats.getErrorCode()));
pigStatsGroup.put("ERROR_MESSAGE", pigStats.getErrorMessage());
pigStatsGroup.put("FEATURES", pigStats.getFeatures());
pigStatsGroup.put("HADOOP_VERSION", pigStats.getHadoopVersion());
pigStatsGroup.put("NUMBER_JOBS", Long.toString(pigStats.getNumberJobs()));
pigStatsGroup.put("PIG_VERSION", pigStats.getPigVersion());
pigStatsGroup.put("PROACTIVE_SPILL_COUNT_OBJECTS", Long.toString(pigStats.getProactiveSpillCountObjects()));
pigStatsGroup.put("PROACTIVE_SPILL_COUNT_RECORDS", Long.toString(pigStats.getProactiveSpillCountRecords()));
pigStatsGroup.put("RECORD_WRITTEN", Long.toString(pigStats.getRecordWritten()));
pigStatsGroup.put("RETURN_CODE", Long.toString(pigStats.getReturnCode()));
pigStatsGroup.put("SCRIPT_ID", pigStats.getScriptId());
pigStatsGroup.put("SMM_SPILL_COUNT", Long.toString(pigStats.getSMMSpillCount()));
PigStats.JobGraph jobGraph = pigStats.getJobGraph();
StringBuilder sb = new StringBuilder();
String separator = ",";
for (JobStats jobStats : jobGraph) {
// Get all the HadoopIds and put them as comma separated string for JOB_GRAPH
String hadoopId = jobStats.getJobId();
if (sb.length() > 0) {
sb.append(separator);
}
sb.append(hadoopId);
// Hadoop Counters for pig created MR job
pigStatsGroup.put(hadoopId, toJSONFromJobStats(jobStats));
}
pigStatsGroup.put("JOB_GRAPH", sb.toString());
return pigStatsGroup.toJSONString();
}
// MR job related counters
@SuppressWarnings("unchecked")
private static JSONObject toJSONFromJobStats(JobStats jobStats) {
JSONObject jobStatsGroup = new JSONObject();
// hadoop counters
jobStatsGroup.put(PigStatsUtil.HDFS_BYTES_WRITTEN, Long.toString(jobStats.getHdfsBytesWritten()));
jobStatsGroup.put(PigStatsUtil.MAP_INPUT_RECORDS, Long.toString(jobStats.getMapInputRecords()));
jobStatsGroup.put(PigStatsUtil.MAP_OUTPUT_RECORDS, Long.toString(jobStats.getMapOutputRecords()));
jobStatsGroup.put(PigStatsUtil.REDUCE_INPUT_RECORDS, Long.toString(jobStats.getReduceInputRecords()));
jobStatsGroup.put(PigStatsUtil.REDUCE_OUTPUT_RECORDS, Long.toString(jobStats.getReduceOutputRecords()));
// currently returns null; pig bug
jobStatsGroup.put("HADOOP_COUNTERS", toJSONFromCounters(jobStats.getHadoopCounters()));
// pig generated hadoop counters and other stats
jobStatsGroup.put("Alias", jobStats.getAlias());
jobStatsGroup.put("AVG_MAP_TIME", Long.toString(jobStats.getAvgMapTime()));
jobStatsGroup.put("AVG_REDUCE_TIME", Long.toString(jobStats.getAvgREduceTime()));
jobStatsGroup.put("BYTES_WRITTEN", Long.toString(jobStats.getBytesWritten()));
jobStatsGroup.put("ERROR_MESSAGE", jobStats.getErrorMessage());
jobStatsGroup.put("FEATURE", jobStats.getFeature());
jobStatsGroup.put("JOB_ID", jobStats.getJobId());
jobStatsGroup.put("MAX_MAP_TIME", Long.toString(jobStats.getMaxMapTime()));
jobStatsGroup.put("MIN_MAP_TIME", Long.toString(jobStats.getMinMapTime()));
jobStatsGroup.put("MAX_REDUCE_TIME", Long.toString(jobStats.getMaxReduceTime()));
jobStatsGroup.put("MIN_REDUCE_TIME", Long.toString(jobStats.getMinReduceTime()));
jobStatsGroup.put("NUMBER_MAPS", Long.toString(jobStats.getNumberMaps()));
jobStatsGroup.put("NUMBER_REDUCES", Long.toString(jobStats.getNumberReduces()));
jobStatsGroup.put("PROACTIVE_SPILL_COUNT_OBJECTS", Long.toString(jobStats.getProactiveSpillCountObjects()));
jobStatsGroup.put("PROACTIVE_SPILL_COUNT_RECS", Long.toString(jobStats.getProactiveSpillCountRecs()));
jobStatsGroup.put("RECORD_WRITTEN", Long.toString(jobStats.getRecordWrittern()));
jobStatsGroup.put("SMMS_SPILL_COUNT", Long.toString(jobStats.getSMMSpillCount()));
jobStatsGroup.put("MULTI_STORE_COUNTERS", toJSONFromMultiStoreCounters(jobStats.getMultiStoreCounters()));
return jobStatsGroup;
}
// multistorecounters to JSON
@SuppressWarnings("unchecked")
private static JSONObject toJSONFromMultiStoreCounters(Map<String, Long> map) {
JSONObject group = new JSONObject();
for (Map.Entry cName : map.entrySet()) {
group.put(cName.getKey(), cName.getValue());
}
return group;
}
// hadoop counters to JSON
@SuppressWarnings("unchecked")
private static JSONObject toJSONFromCounters(Counters counters) {
if (counters == null) {
return null;
}
JSONObject groups = new JSONObject();
for (String gName : counters.getGroupNames()) {
JSONObject group = new JSONObject();
for (Counter counter : counters.getGroup(gName)) {
String cName = counter.getName();
Long cValue = counter.getValue();
group.put(cName, Long.toString(cValue));
}
groups.put(gName, group);
}
return groups;
}
}