blob: d70e6946fa734d7a26aa9f48e7e1806e3126cabc [file] [log] [blame]
diff --git a/pom.xml b/pom.xml
index 3c0bc9b..9bbee65 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,7 +157,7 @@
<ST4.version>4.0.4</ST4.version>
<tez.version>0.5.2</tez.version>
<super-csv.version>2.2.0</super-csv.version>
- <spark.version>1.3.1</spark.version>
+ <spark.version>1.5.1</spark.version>
<scala.binary.version>2.10</scala.binary.version>
<scala.version>2.10.4</scala.version>
<tempus-fugit.version>1.1</tempus-fugit.version>
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
index 51772cd..52f4b9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
@@ -23,29 +23,15 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.spark.JavaSparkListener;
import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.scheduler.SparkListener;
-import org.apache.spark.scheduler.SparkListenerApplicationEnd;
-import org.apache.spark.scheduler.SparkListenerApplicationStart;
-import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
-import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
-import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
-import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
-import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
-import org.apache.spark.scheduler.SparkListenerStageCompleted;
-import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
-import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
-import org.apache.spark.scheduler.SparkListenerTaskStart;
-import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
-import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
-import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-public class JobMetricsListener implements SparkListener {
+public class JobMetricsListener extends JavaSparkListener {
private static final Log LOG = LogFactory.getLog(JobMetricsListener.class);
@@ -54,36 +40,6 @@
private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics = Maps.newHashMap();
@Override
- public void onExecutorRemoved(SparkListenerExecutorRemoved removed) {
-
- }
-
- @Override
- public void onExecutorAdded(SparkListenerExecutorAdded added) {
-
- }
-
- @Override
- public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
-
- }
-
- @Override
- public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
-
- }
-
- @Override
- public void onTaskStart(SparkListenerTaskStart taskStart) {
-
- }
-
- @Override
- public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
-
- }
-
- @Override
public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
int stageId = taskEnd.stageId();
int stageAttemptId = taskEnd.stageAttemptId();
@@ -119,46 +75,6 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
jobIdToStageId.put(jobId, intStageIds);
}
- @Override
- public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
-
- }
-
- @Override
- public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
-
- }
-
- @Override
- public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
-
- }
-
- @Override
- public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
-
- }
-
- @Override
- public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
-
- }
-
- @Override
- public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
-
- }
-
- @Override
- public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
-
- }
-
- @Override
- public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
-
- }
-
public synchronized Map<String, List<TaskMetrics>> getJobMetric(int jobId) {
return allJobMetrics.get(jobId);
}
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
index b77c9e8..f5b1e48 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
@@ -43,26 +43,13 @@
import org.apache.hive.spark.client.rpc.Rpc;
import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.hive.spark.counter.SparkCounters;
+import org.apache.spark.JavaSparkListener;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.scheduler.SparkListener;
-import org.apache.spark.scheduler.SparkListenerApplicationEnd;
-import org.apache.spark.scheduler.SparkListenerApplicationStart;
-import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
-import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
-import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
-import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
-import org.apache.spark.scheduler.SparkListenerStageCompleted;
-import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
-import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
-import org.apache.spark.scheduler.SparkListenerTaskStart;
-import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
-import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
-import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -438,21 +425,11 @@ private void monitorJob(JavaFutureAction<?> job,
}
- private class ClientListener implements SparkListener {
+ private class ClientListener extends JavaSparkListener {
private final Map<Integer, Integer> stageToJobId = Maps.newHashMap();
@Override
- public void onExecutorRemoved(SparkListenerExecutorRemoved removed) {
-
- }
-
- @Override
- public void onExecutorAdded(SparkListenerExecutorAdded added) {
-
- }
-
- @Override
public void onJobStart(SparkListenerJobStart jobStart) {
synchronized (stageToJobId) {
for (int i = 0; i < jobStart.stageIds().length(); i++) {
@@ -500,39 +477,6 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
}
}
- @Override
- public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }
-
- @Override
- public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { }
-
- @Override
- public void onTaskStart(SparkListenerTaskStart taskStart) { }
-
- @Override
- public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { }
-
- @Override
- public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { }
-
- @Override
- public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { }
-
- @Override
- public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { }
-
- @Override
- public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { }
-
- @Override
- public void onApplicationStart(SparkListenerApplicationStart applicationStart) { }
-
- @Override
- public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { }
-
- @Override
- public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { }
-
/**
* Returns the client job ID for the given Spark job ID.
*