HADOOP-18402. S3A committer NPE in spark job abort (#4735)
jobId.toString() to only be called when the ID isn't null.
this doesn't surface in MR, but spark seems to manage it
Change-Id: I06692ef30a4af510c660d7222292932a8d4b5147
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java
index 20024ba..17f63e6 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.fs.audit.CommonAuditContext;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -49,12 +50,17 @@
* @param jobContext job/task context.
*/
public AuditContextUpdater(final JobContext jobContext) {
- this.jobId = jobContext.getJobID().toString();
+ JobID contextJobID = jobContext.getJobID();
+ this.jobId = contextJobID != null
+ ? contextJobID.toString()
+ : null;
if (jobContext instanceof TaskAttemptContext) {
// it's a task, extract info for auditing
final TaskAttemptID tid = ((TaskAttemptContext) jobContext).getTaskAttemptID();
- this.taskAttemptId = tid.toString();
+ this.taskAttemptId = tid != null
+ ? tid.toString()
+ : null;
} else {
this.taskAttemptId = null;
}
@@ -70,7 +76,11 @@
*/
public void updateCurrentAuditContext() {
final CommonAuditContext auditCtx = currentAuditContext();
- auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
+ if (jobId != null) {
+ auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
+ } else {
+ currentAuditContext().remove(AuditConstants.PARAM_JOB_ID);
+ }
if (taskAttemptId != null) {
auditCtx.put(AuditConstants.PARAM_TASK_ATTEMPT_ID, taskAttemptId);
} else {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
index 8ac3dcb..c93d2d8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.JsonSerialization;
import org.apache.hadoop.util.Preconditions;
@@ -156,7 +157,12 @@
this.commitOperations = commitOperations;
this.jobContext = jobContext;
this.conf = jobContext.getConfiguration();
- this.jobId = jobContext.getJobID().toString();
+ JobID contextJobID = jobContext.getJobID();
+ // either the job ID or make one up as it will be
+ // used for the filename of any reports.
+ this.jobId = contextJobID != null
+ ? contextJobID.toString()
+ : ("job-without-id-at-" + System.currentTimeMillis());
this.collectIOStatistics = conf.getBoolean(
S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS,
S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT);