| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.pig.tools.pigstats.spark; |
| |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.mapred.Counters; |
| import org.apache.pig.PigWarning; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; |
| import org.apache.pig.backend.hadoop.executionengine.spark.JobStatisticCollector; |
| import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; |
| import org.apache.pig.impl.logicalLayer.FrontendException; |
| import org.apache.pig.newplan.PlanVisitor; |
| import org.apache.pig.tools.pigstats.InputStats; |
| import org.apache.pig.tools.pigstats.JobStats; |
| import org.apache.pig.tools.pigstats.OutputStats; |
| import org.apache.pig.tools.pigstats.PigStats; |
| import org.apache.pig.tools.pigstats.PigStatsUtil; |
| import org.apache.spark.executor.TaskMetrics; |
| |
| import com.google.common.collect.Maps; |
| |
| public abstract class SparkJobStats extends JobStats { |
| |
| private int jobId; |
| private Map<String, Long> stats = Maps.newLinkedHashMap(); |
| private boolean disableCounter; |
| protected Counters counters = null; |
| public static String FS_COUNTER_GROUP = "FS_GROUP"; |
| private Map<String, SparkCounter<Map<String, Long>>> warningCounters = null; |
| |
| protected SparkJobStats(int jobId, PigStats.JobGraph plan, Configuration conf) { |
| this(String.valueOf(jobId), plan, conf); |
| this.jobId = jobId; |
| } |
| |
| protected SparkJobStats(String jobId, PigStats.JobGraph plan, Configuration conf) { |
| super(jobId, plan); |
| setConf(conf); |
| } |
| |
| @Override |
| public void setConf(Configuration conf) { |
| super.setConf(conf); |
| disableCounter = conf.getBoolean("pig.disable.counter", false); |
| initializeHadoopCounter(); |
| } |
| |
| public void addOutputInfo(POStore poStore, boolean success, |
| JobStatisticCollector jobStatisticCollector) { |
| if (!poStore.isTmpStore()) { |
| long bytes = getOutputSize(poStore, conf); |
| long recordsCount = -1; |
| if (disableCounter == false) { |
| recordsCount = SparkStatsUtil.getRecordCount(poStore); |
| } |
| OutputStats outputStats = new OutputStats(poStore.getSFile().getFileName(), |
| bytes, recordsCount, success); |
| outputStats.setPOStore(poStore); |
| outputStats.setConf(conf); |
| |
| outputs.add(outputStats); |
| } |
| } |
| |
| public void addInputStats(POLoad po, boolean success, |
| boolean singleInput) { |
| |
| long recordsCount = -1; |
| if (disableCounter == false) { |
| recordsCount = SparkStatsUtil.getRecordCount(po); |
| } |
| long bytesRead = -1; |
| if (singleInput && stats.get("BytesRead") != null) { |
| bytesRead = stats.get("BytesRead"); |
| } |
| InputStats inputStats = new InputStats(po.getLFile().getFileName(), |
| bytesRead, recordsCount, success); |
| inputStats.setConf(conf); |
| |
| inputs.add(inputStats); |
| } |
| |
| public void collectStats(JobStatisticCollector jobStatisticCollector) { |
| if (jobStatisticCollector != null) { |
| Map<String, List<TaskMetrics>> taskMetrics = jobStatisticCollector.getJobMetric(jobId); |
| if (taskMetrics == null) { |
| throw new RuntimeException("No task metrics available for jobId " + jobId); |
| } |
| stats = combineTaskMetrics(taskMetrics); |
| } |
| } |
| |
| protected abstract Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric); |
| |
| public Map<String, Long> getStats() { |
| return stats; |
| } |
| |
| @Override |
| public String getJobId() { |
| return String.valueOf(jobId); |
| } |
| |
| @Override |
| public void accept(PlanVisitor v) throws FrontendException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public String getDisplayString() { |
| return null; |
| } |
| |
| @Override |
| public int getNumberMaps() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public int getNumberReduces() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getMaxMapTime() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getMinMapTime() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getAvgMapTime() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getMaxReduceTime() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getMinReduceTime() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getAvgREduceTime() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getMapInputRecords() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getMapOutputRecords() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getReduceInputRecords() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getReduceOutputRecords() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getSMMSpillCount() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getProactiveSpillCountObjects() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getProactiveSpillCountRecs() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public Counters getHadoopCounters() { |
| return counters; |
| } |
| |
| @Override |
| public Map<String, Long> getMultiStoreCounters() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public Map<String, Long> getMultiInputCounters() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| public void setAlias(SparkOperator sparkOperator) { |
| SparkScriptState ss = (SparkScriptState) SparkScriptState.get(); |
| SparkScriptState.SparkScriptInfo sparkScriptInfo = ss.getScriptInfo(); |
| annotate(ALIAS, sparkScriptInfo.getAlias(sparkOperator)); |
| annotate(ALIAS_LOCATION, sparkScriptInfo.getAliasLocation(sparkOperator)); |
| annotate(FEATURE, sparkScriptInfo.getPigFeatures(sparkOperator)); |
| } |
| |
| private void initializeHadoopCounter() { |
| counters = new Counters(); |
| Counters.Group fsGrp = counters.addGroup(FS_COUNTER_GROUP, FS_COUNTER_GROUP); |
| fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_READ, PigStatsUtil.HDFS_BYTES_READ, 0); |
| fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_WRITTEN, PigStatsUtil.HDFS_BYTES_WRITTEN, 0); |
| } |
| |
| |
| public Map<String, SparkCounter<Map<String, Long>>> getWarningCounters() { |
| return warningCounters; |
| } |
| |
| public void initWarningCounters() { |
| SparkCounters counters = SparkPigStatusReporter.getInstance().getCounters(); |
| SparkCounterGroup<Map<String, Long>> sparkCounterGroup = counters.getSparkCounterGroups().get( |
| PigWarning.class.getCanonicalName()); |
| if (sparkCounterGroup != null) { |
| this.warningCounters = sparkCounterGroup.getSparkCounters(); |
| } |
| } |
| } |