MINOR: enrich MAP_REDUCE_JOB_STREAM
Add 5 fields in MAP_REDUCE_JOB_STREAM
* numTotalMaps
* numTotalReduces
* duration
* avgMapTime
* avgReduceTime
Author: Zhao, Qingwen <qingwzhao@apache.org>
Closes #940 from qingwen220/minor.
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java
index 457f0c5..86ad2c1 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/rpc/JobRpcEvaluator.java
@@ -54,8 +54,12 @@
long reduceStartTime = Long.MAX_VALUE;
long reduceEndTime = 0;
+ double totalMapTime = 0;
+ double totalReduceTime = 0;
+
for (TaskExecutionAPIEntity task : entity.getTasksMap().values()) {
if (task.getTags().get(TASK_TYPE.toString()).equalsIgnoreCase(Constants.TaskType.MAP.toString())) {
+ totalMapTime += task.getDuration();
if (mapStartTime > task.getStartTime()) {
mapStartTime = task.getStartTime();
}
@@ -63,6 +67,7 @@
mapEndTime = task.getEndTime();
}
} else {
+ totalReduceTime += task.getDuration();
if (reduceStartTime > task.getStartTime()) {
reduceStartTime = task.getStartTime();
}
@@ -83,26 +88,42 @@
analysisAPIEntity.setTags(tags);
analysisAPIEntity.setTimestamp(entity.getStartTime());
analysisAPIEntity.setTrackingUrl(entity.getTrackingUrl());
+ analysisAPIEntity.setDuration(entity.getDurationTime());
+ analysisAPIEntity.setNumTotalMaps(entity.getTotalMaps());
+ analysisAPIEntity.setNumTotalReduces(entity.getTotalReduces());
+ analysisAPIEntity.setCurrentState(entity.getCurrentState());
+
+ double avgOpsPerMap = 0;
+ double avgMapTime = 0;
+ double avgOpsPerReduce = 0;
+ double avgReduceTime = 0;
+ double mapOpsPerSecond = 0;
+ double reduceOpsPerSecond = 0;
+
+ if (entity.getTotalMaps() > 0) {
+ avgMapTime = totalMapTime / entity.getTotalMaps();
+ avgOpsPerMap = totalMapHdfsOps / entity.getTotalMaps();
+ mapOpsPerSecond = totalMapHdfsOps / ((mapEndTime - mapStartTime) / 1000);
+ }
+ if (entity.getTotalReduces() > 0) {
+ avgReduceTime = totalReduceTime / entity.getTotalReduces();
+ avgOpsPerReduce = totalReduceHdfsOps / entity.getTotalReduces();
+ reduceOpsPerSecond = totalReduceHdfsOps / ((reduceEndTime - reduceStartTime) / 1000);
+ }
double totalOpsPerSecond = (entity.getDurationTime() == 0) ? 0 :
(totalMapHdfsOps + totalReduceHdfsOps) / (entity.getDurationTime() / 1000);
- double mapOpsPerSecond = (entity.getTotalMaps() == 0) ? 0 :
- totalMapHdfsOps / ((mapEndTime - mapStartTime) / 1000);
- double reduceOpsPerSecond = (entity.getTotalReduces() == 0) ? 0 :
- totalReduceHdfsOps / ((reduceEndTime - reduceStartTime) / 1000);
-
- double avgOpsPerTask = (totalMapHdfsOps + totalReduceHdfsOps) / (entity.getTotalMaps() + entity.getTotalReduces());
- double avgOpsPerMap = (entity.getTotalMaps() == 0) ? 0 :
- totalMapHdfsOps / entity.getTotalMaps();
- double avgOpsPerReduce = (entity.getTotalReduces() == 0) ? 0 :
- totalReduceHdfsOps / entity.getTotalReduces();
+ double avgOpsPerTask = (totalMapHdfsOps + totalReduceHdfsOps) / (entity.getTotalMaps() + entity.getTotalReduces());
+
analysisAPIEntity.setTotalOpsPerSecond(totalOpsPerSecond);
analysisAPIEntity.setMapOpsPerSecond(mapOpsPerSecond);
analysisAPIEntity.setReduceOpsPerSecond(reduceOpsPerSecond);
analysisAPIEntity.setAvgOpsPerTask(avgOpsPerTask);
analysisAPIEntity.setAvgOpsPerMap(avgOpsPerMap);
analysisAPIEntity.setAvgOpsPerReduce(avgOpsPerReduce);
+ analysisAPIEntity.setAvgMapTime(avgMapTime);
+ analysisAPIEntity.setAvgReduceTime(avgReduceTime);
Result.EvaluatorResult result = new Result.EvaluatorResult();
result.addProcessorEntity(JobRpcEvaluator.class, analysisAPIEntity);
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java
index 6c0e539..ec04286 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobRpcAnalysisAPIEntity.java
@@ -51,6 +51,16 @@
private double avgOpsPerMap;
@Column("h")
private double avgOpsPerReduce;
+ @Column("i")
+ private double avgMapTime;
+ @Column("j")
+ private double avgReduceTime;
+ @Column("k")
+ private int numTotalMaps;
+ @Column("l")
+ private int numTotalReduces;
+ @Column("m")
+ private long duration;
public String getTrackingUrl() {
return trackingUrl;
@@ -124,5 +134,50 @@
valueChanged("avgOpsPerReduce");
}
+ public double getAvgMapTime() {
+ return avgMapTime;
+ }
+
+ public void setAvgMapTime(double avgMapTime) {
+ this.avgMapTime = avgMapTime;
+ valueChanged("avgMapTime");
+ }
+
+ public double getAvgReduceTime() {
+ return avgReduceTime;
+ }
+
+ public void setAvgReduceTime(double avgReduceTime) {
+ this.avgReduceTime = avgReduceTime;
+ valueChanged("avgReduceTime");
+ }
+
+ public int getNumTotalMaps() {
+ return numTotalMaps;
+ }
+
+ public void setNumTotalMaps(int numTotalMaps) {
+ this.numTotalMaps = numTotalMaps;
+ valueChanged("numTotalMaps");
+ }
+
+ public int getNumTotalReduces() {
+ return numTotalReduces;
+ }
+
+ public void setNumTotalReduces(int numTotalReduces) {
+ this.numTotalReduces = numTotalReduces;
+ valueChanged("numTotalReduces");
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public void setDuration(long duration) {
+ this.duration = duration;
+ valueChanged("duration");
+ }
+
}
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java
index 3b91fbf..5549b56 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobRpcAnalysisStreamPublisher.java
@@ -47,6 +47,11 @@
fields.put("avgOpsPerMap", entity.getAvgOpsPerMap());
fields.put("avgOpsPerReduce", entity.getAvgOpsPerReduce());
fields.put("currentState", entity.getCurrentState());
+ fields.put("numTotalMaps", entity.getNumTotalMaps());
+ fields.put("numTotalReduces", entity.getNumTotalReduces());
+ fields.put("duration", entity.getDuration());
+ fields.put("avgMapTime", entity.getAvgMapTime());
+ fields.put("avgReduceTime", entity.getAvgReduceTime());
collector.collect(stormStreamId, new ValuesArray(fields.get(MRJobTagName.JOB_ID.toString()), fields));
}
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
index 01c5e59..90c002a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
@@ -226,6 +226,26 @@
<defaultValue>0.0</defaultValue>
</column>
<column>
+ <name>avgMapTime</name>
+ <type>double</type>
+ </column>
+ <column>
+ <name>avgReduceTime</name>
+ <type>double</type>
+ </column>
+ <column>
+ <name>numTotalMaps</name>
+ <type>int</type>
+ </column>
+ <column>
+ <name>numTotalReduces</name>
+ <type>int</type>
+ </column>
+ <column>
+ <name>duration</name>
+ <type>long</type>
+ </column>
+ <column>
<name>currentState</name>
<type>string</type>
</column>