HADOOP-18402. S3A committer NPE in spark job abort (#4735)

JobID.toString() and TaskID.toString() to only be called
when the IDs are not null.

This doesn't surface in MapReduce, but Spark SQL can trigger
in job abort, where it may invok abortJob() with an
incomplete TaskContext.

This patch MUST be applied to branches containing
HADOOP-17833. "Improve Magic Committer Performance."

Contributed by Steve Loughran.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java
index 20024ba..17f63e6 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java
@@ -22,6 +22,7 @@
 import org.apache.hadoop.fs.audit.CommonAuditContext;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
@@ -49,12 +50,17 @@
    * @param jobContext job/task context.
    */
   public AuditContextUpdater(final JobContext jobContext) {
-    this.jobId = jobContext.getJobID().toString();
+    JobID contextJobID = jobContext.getJobID();
+    this.jobId = contextJobID != null
+            ? contextJobID.toString()
+            : null;
 
     if (jobContext instanceof TaskAttemptContext) {
       // it's a task, extract info for auditing
       final TaskAttemptID tid = ((TaskAttemptContext) jobContext).getTaskAttemptID();
-      this.taskAttemptId = tid.toString();
+      this.taskAttemptId = tid != null
+          ? tid.toString()
+          : null;
     } else {
       this.taskAttemptId = null;
     }
@@ -70,7 +76,11 @@
    */
   public void updateCurrentAuditContext() {
     final CommonAuditContext auditCtx = currentAuditContext();
-    auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
+    if (jobId != null) {
+      auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
+    } else {
+      currentAuditContext().remove(AuditConstants.PARAM_JOB_ID);
+    }
     if (taskAttemptId != null) {
       auditCtx.put(AuditConstants.PARAM_TASK_ATTEMPT_ID, taskAttemptId);
     } else {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
index 8ac3dcb..c93d2d8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
@@ -40,6 +40,7 @@
 import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.util.JsonSerialization;
 import org.apache.hadoop.util.Preconditions;
@@ -156,7 +157,12 @@
     this.commitOperations = commitOperations;
     this.jobContext = jobContext;
     this.conf = jobContext.getConfiguration();
-    this.jobId = jobContext.getJobID().toString();
+    JobID contextJobID = jobContext.getJobID();
+    // either the job ID or make one up as it will be
+    // used for the filename of any reports.
+    this.jobId = contextJobID != null
+        ? contextJobID.toString()
+        : ("job-without-id-at-" + System.currentTimeMillis());
     this.collectIOStatistics = conf.getBoolean(
         S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS,
         S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT);