blob: 7c20382b316a9ac8ff6b6afe606b0a848068362d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.tools.pigstats.spark;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.spark.executor.ShuffleReadMetrics;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.executor.TaskMetrics;
import java.util.List;
import java.util.Map;
public class Spark2JobStats extends SparkJobStats {
public Spark2JobStats(int jobId, PigStats.JobGraph plan, Configuration conf) {
super(jobId, plan, conf);
}
public Spark2JobStats(String jobId, PigStats.JobGraph plan, Configuration conf) {
super(jobId, plan, conf);
}
@Override
protected Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric) {
Map<String, Long> results = Maps.newLinkedHashMap();
long executorDeserializeTime = 0;
long executorRunTime = 0;
long resultSize = 0;
long jvmGCTime = 0;
long resultSerializationTime = 0;
long memoryBytesSpilled = 0;
long diskBytesSpilled = 0;
long bytesRead = 0;
long bytesWritten = 0;
long remoteBlocksFetched = 0;
long localBlocksFetched = 0;
long fetchWaitTime = 0;
long remoteBytesRead = 0;
long shuffleBytesWritten = 0;
long shuffleWriteTime = 0;
for (List<TaskMetrics> stageMetric : jobMetric.values()) {
if (stageMetric != null) {
for (TaskMetrics taskMetrics : stageMetric) {
if (taskMetrics != null) {
executorDeserializeTime += taskMetrics.executorDeserializeTime();
executorRunTime += taskMetrics.executorRunTime();
resultSize += taskMetrics.resultSize();
jvmGCTime += taskMetrics.jvmGCTime();
resultSerializationTime += taskMetrics.resultSerializationTime();
memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
diskBytesSpilled += taskMetrics.diskBytesSpilled();
bytesRead += taskMetrics.inputMetrics().bytesRead();
bytesWritten += taskMetrics.outputMetrics().bytesWritten();
ShuffleReadMetrics shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
remoteBlocksFetched += shuffleReadMetricsOption.remoteBlocksFetched();
localBlocksFetched += shuffleReadMetricsOption.localBlocksFetched();
fetchWaitTime += shuffleReadMetricsOption.fetchWaitTime();
remoteBytesRead += shuffleReadMetricsOption.remoteBytesRead();
ShuffleWriteMetrics shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
shuffleBytesWritten += shuffleWriteMetricsOption.shuffleBytesWritten();
shuffleWriteTime += shuffleWriteMetricsOption.shuffleWriteTime();
}
}
}
}
results.put("ExcutorDeserializeTime", executorDeserializeTime);
results.put("ExecutorRunTime", executorRunTime);
results.put("ResultSize", resultSize);
results.put("JvmGCTime", jvmGCTime);
results.put("ResultSerializationTime", resultSerializationTime);
results.put("MemoryBytesSpilled", memoryBytesSpilled);
results.put("DiskBytesSpilled", diskBytesSpilled);
results.put("BytesRead", bytesRead);
hdfsBytesRead = bytesRead;
counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead);
results.put("BytesWritten", bytesWritten);
hdfsBytesWritten = bytesWritten;
counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
results.put("RemoteBlocksFetched", remoteBlocksFetched);
results.put("LocalBlocksFetched", localBlocksFetched);
results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched);
results.put("FetchWaitTime", fetchWaitTime);
results.put("RemoteBytesRead", remoteBytesRead);
results.put("ShuffleBytesWritten", shuffleBytesWritten);
results.put("ShuffleWriteTime", shuffleWriteTime);
return results;
}
}