PIG-5109: Remove HadoopJobHistoryLoader

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1779362 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 1fb8c82..55d9fc5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -28,6 +28,8 @@
 
 PIG-4923: Drop Hadoop 1.x support in Pig 0.17 (szita via rohini)
 
+PIG-5109: Remove HadoopJobHistoryLoader (szita via daijy)
+
 PIG-5067: Revisit union on numeric type and chararray to bytearray (knoguchi)
  
 IMPROVEMENTS
diff --git a/contrib/piggybank/java/build.xml b/contrib/piggybank/java/build.xml
index 1958c50..3bd2bcd 100755
--- a/contrib/piggybank/java/build.xml
+++ b/contrib/piggybank/java/build.xml
@@ -59,14 +59,6 @@
     </if>
     <property name="hadoopversion" value="2" />
 
-    <!-- JobHistoryLoader currently does not support 2 -->
-    <condition property="build.classes.excludes" value="**/HadoopJobHistoryLoader.java" else="">
-        <equals arg1="${hadoopversion}" arg2="2"/>
-    </condition>
-    <condition property="test.classes.excludes" value="**/TestHadoopJobHistoryLoader.java" else="">
-        <equals arg1="${hadoopversion}" arg2="2"/>
-    </condition>
-
     <condition property="hadoopsuffix" value="2" else="">
         <equals arg1="${hadoopversion}" arg2="2"/>
     </condition>
diff --git a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HadoopJobHistoryLoader.java b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HadoopJobHistoryLoader.java
deleted file mode 100644
index 9c937f9..0000000
--- a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HadoopJobHistoryLoader.java
+++ /dev/null
@@ -1,582 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.piggybank.storage;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.text.ParseException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Stack;
-
-import javax.xml.parsers.SAXParserFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.DefaultJobHistoryParser;
-import org.apache.hadoop.mapred.JobHistory;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.Counters.Group;
-import org.apache.hadoop.mapred.JobHistory.JobInfo;
-import org.apache.hadoop.mapred.JobHistory.Keys;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.DefaultTupleFactory;
-import org.apache.pig.data.Tuple;
-import org.xml.sax.Attributes;
-import org.xml.sax.SAXException;
-import org.xml.sax.helpers.DefaultHandler;
-
-public class HadoopJobHistoryLoader extends LoadFunc {
-         
-    private static final Log LOG = LogFactory.getLog(HadoopJobHistoryLoader.class);
-       
-    private RecordReader<Text, MRJobInfo> reader;
-    
-    public HadoopJobHistoryLoader() {
-    }
-   
-    @SuppressWarnings("unchecked")
-    @Override
-    public InputFormat getInputFormat() throws IOException {     
-        return new HadoopJobHistoryInputFormat();
-    }
-
-    @Override
-    public Tuple getNext() throws IOException {
-        boolean notDone = false;
-        try {
-            notDone = reader.nextKeyValue();
-        } catch (InterruptedException e) {
-            throw new IOException(e);
-        }
-        if (!notDone) {
-            return null;
-        }   
-        Tuple t = null;
-        try {
-            MRJobInfo val = (MRJobInfo)reader.getCurrentValue();
-            t = DefaultTupleFactory.getInstance().newTuple(3);
-            t.set(0, val.job);
-            t.set(1, val.mapTask);
-            t.set(2, val.reduceTask);
-        } catch (InterruptedException e) {
-            throw new IOException(e);
-        }
-        return t;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void prepareToRead(RecordReader reader, PigSplit split)
-            throws IOException {
-        this.reader = (HadoopJobHistoryReader)reader;
-    }
-
-    @Override
-    public void setLocation(String location, Job job) throws IOException {
-        FileInputFormat.setInputPaths(job, location);
-        FileInputFormat.setInputPathFilter(job, JobHistoryPathFilter.class);
-    }
-    
-    public static class JobHistoryPathFilter implements PathFilter {
-        @Override
-        public boolean accept(Path p) {
-            String name = p.getName(); 
-            return !name.endsWith(".xml");
-        }       
-    }
-    
-    public static class HadoopJobHistoryInputFormat extends
-            FileInputFormat<Text, MRJobInfo> {
-
-        @Override
-        public RecordReader<Text, MRJobInfo> createRecordReader(
-                InputSplit split, TaskAttemptContext context)
-                throws IOException, InterruptedException {
-            return new HadoopJobHistoryReader();
-        }
-
-        @Override
-        protected boolean isSplitable(JobContext context, Path filename) {
-            return false;
-        }         
-    }
-    
-    public static class HadoopJobHistoryReader extends
-            RecordReader<Text, MRJobInfo> {
-
-        private String location;
-        
-        private MRJobInfo value;
-        
-        private Configuration conf;
-                
-        @Override
-        public void close() throws IOException {            
-        }
-
-        @Override
-        public Text getCurrentKey() throws IOException, InterruptedException {
-            return null;
-        }
-
-        @Override
-        public MRJobInfo getCurrentValue() throws IOException,
-                InterruptedException {            
-            return value;
-        }
-
-        @Override
-        public float getProgress() throws IOException, InterruptedException {
-            return 0;
-        }
-
-        @Override
-        public void initialize(InputSplit split, TaskAttemptContext context)
-                throws IOException, InterruptedException {
-            FileSplit fSplit = (FileSplit) split; 
-            Path p = fSplit.getPath();
-            location = p.toString();
-            LOG.info("location: " + location);    
-            conf = context.getConfiguration();
-        }
-
-        @Override
-        public boolean nextKeyValue() throws IOException, InterruptedException {
-            if (location != null) {
-                LOG.info("load: " + location);  
-                Path full = new Path(location);  
-                String[] jobDetails = 
-                    JobInfo.decodeJobHistoryFileName(full.getName()).split("_");
-                String jobId = jobDetails[2] + "_" + jobDetails[3] + "_"
-                        + jobDetails[4];
-                JobHistory.JobInfo job = new JobHistory.JobInfo(jobId); 
- 
-                value = new MRJobInfo();
-                                            
-                FileSystem fs = full.getFileSystem(conf);
-                FileStatus fstat = fs.getFileStatus(full);
-                
-                LOG.info("file size: " + fstat.getLen());
-                DefaultJobHistoryParser.parseJobTasks(location, job,
-                        full.getFileSystem(conf)); 
-                LOG.info("job history parsed sucessfully");
-                HadoopJobHistoryLoader.parseJobHistory(conf, job, value);
-                LOG.info("get parsed job history");
-                
-                // parse Hadoop job xml file
-                Path parent = full.getParent();
-                String jobXml = jobDetails[0] + "_" + jobDetails[1] + "_" + jobDetails[2] + "_conf.xml";
-                Path p = new Path(parent, jobXml);  
-             
-                FSDataInputStream fileIn = fs.open(p);
-                Map<String, String> val = HadoopJobHistoryLoader
-                        .parseJobXML(fileIn);
-                for (String key : val.keySet()) {
-                    value.job.put(key, val.get(key));
-                }
-                
-                location = null;
-                return true;
-            }          
-            value = null;
-            return false;
-        }   
-    }
-    
-    //------------------------------------------------------------------------
-        
-    public static class MRJobInfo {
-        public Map<String, String> job;
-        public Map<String, String> mapTask;
-        public Map<String, String> reduceTask;
-        
-        public MRJobInfo() {
-            job = new HashMap<String, String>();
-            mapTask = new HashMap<String, String>();
-            reduceTask = new HashMap<String, String>();
-        }
-    }
-    
-    //--------------------------------------------------------------------------------------------
-    
-    public static final String TASK_COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter";
-    public static final String MAP_INPUT_RECORDS = "MAP_INPUT_RECORDS";
-    public static final String REDUCE_INPUT_RECORDS = "REDUCE_INPUT_RECORDS";
-    
-    /**
-     * Job Keys
-     */
-    public static enum JobKeys {
-        JOBTRACKERID, JOBID, JOBNAME, JOBTYPE, USER, SUBMIT_TIME, CONF_PATH, LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES,
-        STATUS, FINISH_TIME, FINISHED_MAPS, FINISHED_REDUCES, FAILED_MAPS, FAILED_REDUCES, 
-        LAUNCHED_MAPS, LAUNCHED_REDUCES, RACKLOCAL_MAPS, DATALOCAL_MAPS, HDFS_BYTES_READ,
-        HDFS_BYTES_WRITTEN, FILE_BYTES_READ, FILE_BYTES_WRITTEN, COMBINE_OUTPUT_RECORDS,
-        COMBINE_INPUT_RECORDS, REDUCE_INPUT_GROUPS, REDUCE_INPUT_RECORDS, REDUCE_OUTPUT_RECORDS,
-        MAP_INPUT_RECORDS, MAP_OUTPUT_RECORDS, MAP_INPUT_BYTES, MAP_OUTPUT_BYTES, MAP_HDFS_BYTES_WRITTEN,
-        JOBCONF, JOB_PRIORITY, SHUFFLE_BYTES, SPILLED_RECORDS
-    }
-    
-    public static void parseJobHistory(Configuration jobConf, JobInfo jobInfo, MRJobInfo value) {
-        value.job.clear();
-        populateJob(jobInfo.getValues(), value.job);
-        value.mapTask.clear();
-        value.reduceTask.clear();
-        populateMapReduceTaskLists(value, jobInfo.getAllTasks());
-    }
-    
-    private static void populateJob (Map<JobHistory.Keys, String> jobC, Map<String, String> job) {            
-        int size = jobC.size();
-        Iterator<Map.Entry<JobHistory.Keys, String>> kv = jobC.entrySet().iterator();
-        for (int i = 0; i < size; i++) {
-            Map.Entry<JobHistory.Keys, String> entry = (Map.Entry<JobHistory.Keys, String>) kv.next();
-            JobHistory.Keys key = entry.getKey();
-            String value = entry.getValue();
-            switch (key) {
-            case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID.toString(), value); break;           
-            case FINISH_TIME: job.put(JobKeys.FINISH_TIME.toString(), value); break;
-            case JOBID: job.put(JobKeys.JOBID.toString(), value); break;
-            case JOBNAME: job.put(JobKeys.JOBNAME.toString(), value); break;
-            case USER: job.put(JobKeys.USER.toString(), value); break;
-            case JOBCONF: job.put(JobKeys.JOBCONF.toString(), value); break;
-            case SUBMIT_TIME: job.put(JobKeys.SUBMIT_TIME.toString(), value); break;
-            case LAUNCH_TIME: job.put(JobKeys.LAUNCH_TIME.toString(), value); break;
-            case TOTAL_MAPS: job.put(JobKeys.TOTAL_MAPS.toString(), value); break;
-            case TOTAL_REDUCES: job.put(JobKeys.TOTAL_REDUCES.toString(), value); break;
-            case FAILED_MAPS: job.put(JobKeys.FAILED_MAPS.toString(), value); break;
-            case FAILED_REDUCES: job.put(JobKeys.FAILED_REDUCES.toString(), value); break;
-            case FINISHED_MAPS: job.put(JobKeys.FINISHED_MAPS.toString(), value); break;
-            case FINISHED_REDUCES: job.put(JobKeys.FINISHED_REDUCES.toString(), value); break;
-            case JOB_STATUS: job.put(JobKeys.STATUS.toString(), value); break;
-            case COUNTERS:
-                value.concat(",");
-                parseAndAddJobCounters(job, value);
-                break;
-            default: 
-                LOG.debug("JobHistory.Keys."+ key + " : NOT INCLUDED IN LOADER RETURN VALUE");
-                break;
-            }
-        }
-    }
-    
-    /*
-     * Parse and add the job counters
-     */
-    @SuppressWarnings("deprecation")
-    private static void parseAndAddJobCounters(Map<String, String> job, String counters) {
-        try {
-            Counters counterGroups = Counters.fromEscapedCompactString(counters);
-            for (Group otherGroup : counterGroups) {
-                Group group = counterGroups.getGroup(otherGroup.getName());
-                for (Counter otherCounter : otherGroup) {
-                    Counter counter = group.getCounterForName(otherCounter.getName());
-                    job.put(otherCounter.getName(), String.valueOf(counter.getValue()));
-                }
-            }
-        } catch (ParseException e) {
-           LOG.warn("Failed to parse job counters", e);
-        }
-    } 
-    
-    @SuppressWarnings("deprecation")
-    private static void populateMapReduceTaskLists (MRJobInfo value, 
-            Map<String, JobHistory.Task> taskMap) {
-                
-        Map<String, String> mapT = value.mapTask;
-        Map<String, String> reduceT = value.reduceTask;
-        long minMapRows = Long.MAX_VALUE;
-        long maxMapRows = 0;
-        long minMapTime = Long.MAX_VALUE;
-        long maxMapTime = 0;
-        long avgMapTime = 0;
-        long totalMapTime = 0;
-        int numberMaps = 0;
-        
-        long minReduceRows = Long.MAX_VALUE;
-        long maxReduceRows = 0;        
-        long minReduceTime = Long.MAX_VALUE;
-        long maxReduceTime = 0;
-        long avgReduceTime = 0;
-        long totalReduceTime = 0;
-        int numberReduces = 0;
-       
-        int num_tasks = taskMap.entrySet().size();
-        Iterator<Map.Entry<String, JobHistory.Task>> ti = taskMap.entrySet().iterator();
-        for (int i = 0; i < num_tasks; i++) {
-            Map.Entry<String, JobHistory.Task> entry = (Map.Entry<String, JobHistory.Task>) ti.next();
-            JobHistory.Task task = entry.getValue();
-            if (task.get(Keys.TASK_TYPE).equals("MAP")) {
-                Map<JobHistory.Keys, String> mapTask = task.getValues();
-                Map<JobHistory.Keys, String> successTaskAttemptMap  =  getLastSuccessfulTaskAttempt(task);
-                // NOTE: Following would lead to less number of actual tasks collected in the tasklist array
-                if (successTaskAttemptMap != null) {
-                    mapTask.putAll(successTaskAttemptMap);
-                } else {
-                    LOG.info("Task:<" + task.get(Keys.TASKID) + "> is not successful - SKIPPING");
-                }
-                long duration = 0;
-                long startTime = 0;
-                long endTime = 0;
-                int size = mapTask.size();
-                numberMaps++;
-                Iterator<Map.Entry<JobHistory.Keys, String>> kv = mapTask.entrySet().iterator();
-                for (int j = 0; j < size; j++) {
-                    Map.Entry<JobHistory.Keys, String> mtc = kv.next();
-                    JobHistory.Keys key = mtc.getKey();
-                    String val = mtc.getValue();
-                    switch (key) {
-                    case START_TIME: 
-                        startTime = Long.valueOf(val);
-                        break;
-                    case FINISH_TIME:
-                        endTime = Long.valueOf(val);
-                        break;                    
-                    case COUNTERS: {
-                        try {
-                            Counters counters = Counters.fromEscapedCompactString(val);
-                            long rows = counters.getGroup(TASK_COUNTER_GROUP)
-                                    .getCounterForName(MAP_INPUT_RECORDS).getCounter(); 
-                            if (rows < minMapRows) minMapRows = rows;
-                            if (rows > maxMapRows) maxMapRows = rows;
-                        } catch (ParseException e) {
-                            LOG.warn("Failed to parse job counters", e);
-                        }
-                    }
-                    break;
-                    default: 
-                        LOG.warn("JobHistory.Keys." + key 
-                                + " : NOT INCLUDED IN PERFORMANCE ADVISOR MAP COUNTERS");
-                        break;
-                    }
-                }
-                duration = endTime - startTime;
-                if (minMapTime > duration) minMapTime = duration;
-                if (maxMapTime < duration) maxMapTime = duration;
-                totalMapTime += duration;        
-            } else if (task.get(Keys.TASK_TYPE).equals("REDUCE")) {
-                Map<JobHistory.Keys, String> reduceTask = task.getValues();
-                Map<JobHistory.Keys, String> successTaskAttemptMap  =  getLastSuccessfulTaskAttempt(task);
-                // NOTE: Following would lead to less number of actual tasks collected in the tasklist array
-                if (successTaskAttemptMap != null) {
-                    reduceTask.putAll(successTaskAttemptMap);
-                } else {
-                    LOG.warn("Task:<" + task.get(Keys.TASKID) + "> is not successful - SKIPPING");
-                }
-                long duration = 0;
-                long startTime = 0;
-                long endTime = 0;
-                int size = reduceTask.size();
-                numberReduces++;
-
-                Iterator<Map.Entry<JobHistory.Keys, String>> kv = reduceTask.entrySet().iterator();
-                for (int j = 0; j < size; j++) {
-                    Map.Entry<JobHistory.Keys, String> rtc = kv.next();
-                    JobHistory.Keys key = rtc.getKey();
-                    String val = rtc.getValue();
-                    switch (key) {
-                    case START_TIME: 
-                        startTime = Long.valueOf(val);
-                        break;
-                    case FINISH_TIME:
-                        endTime = Long.valueOf(val);
-                        break;
-                    case COUNTERS: {
-                        try {
-                            Counters counters = Counters.fromEscapedCompactString(val);
-                            long rows = counters.getGroup(TASK_COUNTER_GROUP)
-                                    .getCounterForName(REDUCE_INPUT_RECORDS).getCounter(); 
-                            if (rows < minReduceRows) minReduceRows = rows;
-                            if (rows > maxReduceRows) maxReduceRows = rows;
-                        } catch (ParseException e) {
-                            LOG.warn("Failed to parse job counters", e);
-                        }
-                    }
-                    break;
-                    default: 
-                        LOG.warn("JobHistory.Keys." + key 
-                                + " : NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE COUNTERS");
-                        break;
-                    }
-                }
-                
-                duration = endTime - startTime;
-                if (minReduceTime > duration) minReduceTime = duration;
-                if (maxReduceTime < duration) maxReduceTime = duration;
-                totalReduceTime += duration;
-
-            } else if (task.get(Keys.TASK_TYPE).equals("CLEANUP")) {
-                LOG.info("IGNORING TASK TYPE : " + task.get(Keys.TASK_TYPE));
-            } else {
-                LOG.warn("UNKNOWN TASK TYPE : " + task.get(Keys.TASK_TYPE));
-            }
-        }
-        if (numberMaps > 0) {
-            avgMapTime = (totalMapTime / numberMaps);
-            mapT.put("MIN_MAP_TIME", String.valueOf(minMapTime));
-            mapT.put("MAX_MAP_TIME", String.valueOf(maxMapTime));
-            mapT.put("MIN_MAP_INPUT_ROWS", String.valueOf(minMapRows));
-            mapT.put("MAX_MAP_INPUT_ROWS", String.valueOf(maxMapRows));
-            mapT.put("AVG_MAP_TIME", String.valueOf(avgMapTime));
-            mapT.put("NUMBER_MAPS", String.valueOf(numberMaps));
-        }
-        if (numberReduces > 0) {
-            avgReduceTime = (totalReduceTime /numberReduces);
-            reduceT.put("MIN_REDUCE_TIME", String.valueOf(minReduceTime));
-            reduceT.put("MAX_REDUCE_TIME", String.valueOf(maxReduceTime));
-            reduceT.put("AVG_REDUCE_TIME", String.valueOf(avgReduceTime));
-            reduceT.put("MIN_REDUCE_INPUT_ROWS", String.valueOf(minReduceTime));
-            reduceT.put("MAX_REDUCE_INPUT_ROWS", String.valueOf(maxReduceTime));            
-            reduceT.put("NUMBER_REDUCES", String.valueOf(numberReduces));
-        } else {
-            reduceT.put("NUMBER_REDUCES", String.valueOf(0));
-        }
-    }
-    
-    /*
-     * Get last successful task attempt to be added in the stats
-     */
-    private static Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt(
-            JobHistory.Task task) {
-
-        Map<String, JobHistory.TaskAttempt> taskAttempts = task
-                .getTaskAttempts();
-        int size = taskAttempts.size();
-        Iterator<Map.Entry<String, JobHistory.TaskAttempt>> kv = taskAttempts
-                .entrySet().iterator();
-        for (int i = 0; i < size; i++) {
-            // CHECK_IT: Only one SUCCESSFUL TASK ATTEMPT
-            Map.Entry<String, JobHistory.TaskAttempt> tae = kv.next();
-            JobHistory.TaskAttempt attempt = tae.getValue();
-            if (null != attempt && null != attempt.getValues() && attempt.getValues().containsKey(JobHistory.Keys.TASK_STATUS) && attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals(
-                    "SUCCESS")) {
-                return attempt.getValues();
-            }
-        }
-
-        return null;
-    }
-   
-    //-------------------------------------------------------------------------
-    /*
-     * Job xml keys
-     */
-    private static final Map<String, String> XML_KEYS;
-    
-    static {
-        XML_KEYS = new HashMap<String, String> ();
-        XML_KEYS.put(MRConfiguration.JOB_QUEUE_NAME, "QUEUE_NAME");
-        XML_KEYS.put("group.name", "USER_GROUP");
-        XML_KEYS.put("user.name", "USER");
-        XML_KEYS.put("user.dir", "HOST_DIR");
-        XML_KEYS.put("cluster", "CLUSTER");
-        XML_KEYS.put("jobName", "JOB_NAME");
-        XML_KEYS.put("pig.script.id", "PIG_SCRIPT_ID");
-        XML_KEYS.put("pig.script", "PIG_SCRIPT");
-        XML_KEYS.put("pig.hadoop.version", "HADOOP_VERSION");
-        XML_KEYS.put("pig.version", "PIG_VERSION");
-        XML_KEYS.put("pig.job.feature", "PIG_JOB_FEATURE");
-        XML_KEYS.put("pig.alias", "PIG_JOB_ALIAS"); 
-        XML_KEYS.put("pig.parent.jobid", "PIG_JOB_PARENTS");
-        XML_KEYS.put("pig.host", "HOST_NAME");        
-    }
-     
-    public static Map<String, String> parseJobXML(InputStream in) {
-        
-        HashMap<String, String> xmlMap = new HashMap<String, String>();
-        
-        try {
-            SAXParserFactory.newInstance().newSAXParser().parse(in,
-                    new JobXMLHandler(xmlMap));
-        } catch (Exception e) {
-            LOG.warn("Failed to parser job xml", e);
-        }
-                          
-        return xmlMap;
-    }
-        
-    private static class JobXMLHandler extends DefaultHandler {
-                  
-        private static final String NAME = "name";
-        private static final String VALUE = "value";
-        
-        private static Stack<String> tags = new Stack<String>();    
-        
-        private static String curTag;
-        
-        private static String key;
- 
-        private static Map<String, String> xmlMap;
-                
-        public JobXMLHandler(Map<String, String> xml) {
-            xmlMap = xml;
-        }
-        
-        @Override
-        public void startElement(String uri, String localName,
-                String qName, Attributes attributes) throws SAXException {            
-            tags.add(qName);
-            curTag = qName;
-        }
-        
-        @Override
-        public void endElement(String uri, String localName,
-                String qName) throws SAXException {
-            String tag = tags.pop();
-            if (tag == null || !tag.equalsIgnoreCase(qName)) {
-                throw new SAXException("Malformatted XML file: " + tag + " : "
-                        + qName);
-            }
-            curTag = null;
-        }
-        
-        public void characters(char[] ch, int start, int length)
-                throws SAXException {
-            if (tags.size() > 1) {
-                String s = new String(ch, start, length); 
-                if (curTag.equalsIgnoreCase(NAME)) {
-                    key = s;
-                }
-                if (curTag.equalsIgnoreCase(VALUE)) {
-                    String displayKey = XML_KEYS.get(key);
-                    if (displayKey != null) {
-                        xmlMap.put(displayKey, s);
-                    }
-                }
-            }
-        }
-    }
-}
diff --git a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHadoopJobHistoryLoader.java b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHadoopJobHistoryLoader.java
deleted file mode 100644
index 242f45f..0000000
--- a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHadoopJobHistoryLoader.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.piggybank.test.storage;
-
-
-import static org.junit.Assert.*;
-
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
-import org.apache.pig.data.Tuple;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestHadoopJobHistoryLoader {
-
-    @BeforeClass
-    public static void setUpBeforeClass() throws Exception {
-    }
-
-    @AfterClass
-    public static void tearDownAfterClass() throws Exception {
-    }
-
-    private static final String INPUT_DIR = 
-        "src/test/java/org/apache/pig/piggybank/test/data/jh";
-    
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testHadoopJHLoader() throws Exception {
-        PigServer pig = new PigServer(ExecType.LOCAL);
-        pig.registerQuery("a = load '" + INPUT_DIR 
-                + "' using org.apache.pig.piggybank.storage.HadoopJobHistoryLoader() " 
-                + "as (j:map[], m:map[], r:map[]);");
-        Iterator<Tuple> iter = pig.openIterator("a");
-        
-        assertTrue(iter.hasNext());
-        
-        Tuple t = iter.next();
-        
-        Map<String, Object> job = (Map<String, Object>)t.get(0);
-        
-        assertEquals("3eb62180-5473-4301-aa22-467bd685d466", (String)job.get("PIG_SCRIPT_ID"));
-        assertEquals("job_201004271216_9998", (String)job.get("JOBID"));
-        assertEquals("job_201004271216_9995", (String)job.get("PIG_JOB_PARENTS"));
-        assertEquals("0.8.0-dev", (String)job.get("PIG_VERSION"));
-        assertEquals("0.20.2", (String)job.get("HADOOP_VERSION"));
-        assertEquals("d", (String)job.get("PIG_JOB_ALIAS"));
-        assertEquals("PigLatin:Test.pig", job.get("JOBNAME"));
-        assertEquals("ORDER_BY", (String)job.get("PIG_JOB_FEATURE"));
-        assertEquals("1", (String)job.get("TOTAL_MAPS"));
-        assertEquals("1", (String)job.get("TOTAL_REDUCES"));              
-    }
-}
diff --git a/src/docs/src/documentation/content/xdocs/pig-index.xml b/src/docs/src/documentation/content/xdocs/pig-index.xml
index 2ca63a3..8e831e2 100644
--- a/src/docs/src/documentation/content/xdocs/pig-index.xml
+++ b/src/docs/src/documentation/content/xdocs/pig-index.xml
@@ -404,7 +404,6 @@
 <p>Hadoop
 <br></br>&nbsp;&nbsp;&nbsp; <a href="cmds.html#fs">FsShell commands</a>
 <br></br>&nbsp;&nbsp;&nbsp; <a href="basic.html#load-glob">Hadoop globbing</a>
-<br></br>&nbsp;&nbsp;&nbsp; <a href="test.html#hadoop-job-history-loader">HadoopJobHistoryLoader</a>
 <br></br>&nbsp;&nbsp;&nbsp; hadoop partitioner. <em>See</em> PARTITION BY
 <br></br>&nbsp;&nbsp;&nbsp; <a href="start.html#hadoop-properties">Hadoop properties</a>
 <br></br>&nbsp;&nbsp;&nbsp; <a href="start.html#req">versions supported</a>
diff --git a/src/docs/src/documentation/content/xdocs/test.xml b/src/docs/src/documentation/content/xdocs/test.xml
index 83432cc..8d60759 100644
--- a/src/docs/src/documentation/content/xdocs/test.xml
+++ b/src/docs/src/documentation/content/xdocs/test.xml
@@ -540,7 +540,7 @@
 
 <p>Pig Statistics is a framework for collecting and storing script-level statistics for Pig Latin. Characteristics of Pig Latin scripts and the resulting MapReduce jobs are collected while the script is executed. These statistics are then available for Pig users and tools using Pig (such as Oozie) to retrieve after the job is done.</p>
 
-<p>The new Pig statistics and the existing Hadoop statistics can also be accessed via the Hadoop job history file (and job xml file). Piggybank has a HadoopJobHistoryLoader which acts as an example of using Pig itself to query these statistics (the loader can be used as a reference implementation but is NOT supported for production use).</p>
+<p>The new Pig statistics and the existing Hadoop statistics can also be accessed via the Hadoop job history file (and job xml file).</p>
 
 <!-- +++++++++++++++++++++++++++++++++++++++ -->
 <section>
@@ -708,93 +708,6 @@
 </tr>
 </table>
 </section>
-
-
-<!-- +++++++++++++++++++++++++++++++++++++++ -->
-<section id="hadoop-job-history-loader">
-<title>Hadoop Job History Loader</title>
-<p>The HadoopJobHistoryLoader in Piggybank loads Hadoop job history files and job xml files from file system. For each MapReduce job, the loader produces a tuple with schema (j:map[], m:map[], r:map[]). The first map in the schema contains job-related entries. Here are some of important key names in the map: </p>
-
-<table>
-<tr>
-<td>
-<p>PIG_SCRIPT_ID</p>
-<p>CLUSTER </p>
-<p>QUEUE_NAME</p>
-<p>JOBID</p>
-<p>JOBNAME</p>
-<p>STATUS</p>
-</td>
-<td>
-<p>USER </p>
-<p>HADOOP_VERSION  </p>
-<p>PIG_VERSION</p>
-<p>PIG_JOB_FEATURE</p>
-<p>PIG_JOB_ALIAS </p>
-<p>PIG_JOB_PARENTS</p>
-</td>
-<td>
-<p>SUBMIT_TIME</p>
-<p>LAUNCH_TIME</p>
-<p>FINISH_TIME</p>
-<p>TOTAL_MAPS</p>
-<p>TOTAL_REDUCES</p>
-</td>
-</tr>
-</table>
-<p></p>
-<p>Examples that use the loader to query Pig statistics are shown below.</p>
-</section>
-
-<!-- +++++++++++++++++++++++++++++++++++++++ -->
-<section>
-<title>Examples</title>
-<p>Find scripts that generate more then three MapReduce jobs:</p>
-<source>
-a = load '/mapred/history/done' using HadoopJobHistoryLoader() as (j:map[], m:map[], r:map[]);
-b = group a by (j#'PIG_SCRIPT_ID', j#'USER', j#'JOBNAME');
-c = foreach b generate group.$1, group.$2, COUNT(a);
-d = filter c by $2 > 3;
-dump d;
-</source>
-
-<p>Find the running time of each script (in seconds): </p>
-<source>
-a = load '/mapred/history/done' using HadoopJobHistoryLoader() as (j:map[], m:map[], r:map[]);
-b = foreach a generate j#'PIG_SCRIPT_ID' as id, j#'USER' as user, j#'JOBNAME' as script_name, 
-         (Long) j#'SUBMIT_TIME' as start, (Long) j#'FINISH_TIME' as end;
-c = group b by (id, user, script_name)
-d = foreach c generate group.user, group.script_name, (MAX(b.end) - MIN(b.start)/1000;
-dump d;
-</source>
-
-<p>Find the number of scripts run by user and queue on a cluster: </p>
-<source>
-a = load '/mapred/history/done' using HadoopJobHistoryLoader() as (j:map[], m:map[], r:map[]);
-b = foreach a generate j#'PIG_SCRIPT_ID' as id, j#'USER' as user, j#'QUEUE_NAME' as queue;
-c = group b by (id, user, queue) parallel 10;
-d = foreach c generate group.user, group.queue, COUNT(b);
-dump d;
-</source>
-
-<p>Find scripts that have failed jobs: </p>
-<source>
-a = load '/mapred/history/done' using HadoopJobHistoryLoader() as (j:map[], m:map[], r:map[]);
-b = foreach a generate (Chararray) j#'STATUS' as status, j#'PIG_SCRIPT_ID' as id, j#'USER' as user, j#'JOBNAME' as script_name, j#'JOBID' as job;
-c = filter b by status != 'SUCCESS';
-dump c;
-</source>
-
-<p>Find scripts that use only the default parallelism: </p>
-<source>
-a = load '/mapred/history/done' using HadoopJobHistoryLoader() as (j:map[], m:map[], r:map[]);
-b = foreach a generate j#'PIG_SCRIPT_ID' as id, j#'USER' as user, j#'JOBNAME' as script_name, (Long) r#'NUMBER_REDUCES' as reduces;
-c = group b by (id, user, script_name) parallel 10;
-d = foreach c generate group.user, group.script_name, MAX(b.reduces) as max_reduces;
-e = filter d by max_reduces == 1;
-dump e;
-</source>
-</section>
 </section>   
 
 <!-- =========================================================================== -->