HADOOP-18402. S3A committer NPE in spark job abort (#4735)

jobId.toString() to only be called when the ID isn't null.

this doesn't surface in MR, but spark seems to manage it

Change-Id: I06692ef30a4af510c660d7222292932a8d4b5147
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java
index 20024ba..17f63e6 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java
@@ -22,6 +22,7 @@
 import org.apache.hadoop.fs.audit.CommonAuditContext;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
@@ -49,12 +50,17 @@
    * @param jobContext job/task context.
    */
   public AuditContextUpdater(final JobContext jobContext) {
-    this.jobId = jobContext.getJobID().toString();
+    JobID contextJobID = jobContext.getJobID();
+    this.jobId = contextJobID != null
+            ? contextJobID.toString()
+            : null;
 
     if (jobContext instanceof TaskAttemptContext) {
       // it's a task, extract info for auditing
       final TaskAttemptID tid = ((TaskAttemptContext) jobContext).getTaskAttemptID();
-      this.taskAttemptId = tid.toString();
+      this.taskAttemptId = tid != null
+          ? tid.toString()
+          : null;
     } else {
       this.taskAttemptId = null;
     }
@@ -70,7 +76,11 @@
    */
   public void updateCurrentAuditContext() {
     final CommonAuditContext auditCtx = currentAuditContext();
-    auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
+    if (jobId != null) {
+      auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
+    } else {
+      currentAuditContext().remove(AuditConstants.PARAM_JOB_ID);
+    }
     if (taskAttemptId != null) {
       auditCtx.put(AuditConstants.PARAM_TASK_ATTEMPT_ID, taskAttemptId);
     } else {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
index 8ac3dcb..c93d2d8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
@@ -40,6 +40,7 @@
 import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.util.JsonSerialization;
 import org.apache.hadoop.util.Preconditions;
@@ -156,7 +157,12 @@
     this.commitOperations = commitOperations;
     this.jobContext = jobContext;
     this.conf = jobContext.getConfiguration();
-    this.jobId = jobContext.getJobID().toString();
+    JobID contextJobID = jobContext.getJobID();
+    // either the job ID or make one up as it will be
+    // used for the filename of any reports.
+    this.jobId = contextJobID != null
+        ? contextJobID.toString()
+        : ("job-without-id-at-" + System.currentTimeMillis());
     this.collectIOStatistics = conf.getBoolean(
         S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS,
         S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT);