Revert "HADOOP-18402. S3A committer NPE in spark job abort (#4735)"

(managed to commit through the github ui before I'd got the message done)

This reverts commit ad83e95046b92540055f7caecf652c455ed2daf9.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java
index 17f63e6..20024ba 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java
@@ -22,7 +22,6 @@
 import org.apache.hadoop.fs.audit.CommonAuditContext;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
@@ -50,17 +49,12 @@
    * @param jobContext job/task context.
    */
   public AuditContextUpdater(final JobContext jobContext) {
-    JobID contextJobID = jobContext.getJobID();
-    this.jobId = contextJobID != null
-            ? contextJobID.toString()
-            : null;
+    this.jobId = jobContext.getJobID().toString();
 
     if (jobContext instanceof TaskAttemptContext) {
       // it's a task, extract info for auditing
       final TaskAttemptID tid = ((TaskAttemptContext) jobContext).getTaskAttemptID();
-      this.taskAttemptId = tid != null
-          ? tid.toString()
-          : null;
+      this.taskAttemptId = tid.toString();
     } else {
       this.taskAttemptId = null;
     }
@@ -76,11 +70,7 @@
    */
   public void updateCurrentAuditContext() {
     final CommonAuditContext auditCtx = currentAuditContext();
-    if (jobId != null) {
-      auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
-    } else {
-      currentAuditContext().remove(AuditConstants.PARAM_JOB_ID);
-    }
+    auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
     if (taskAttemptId != null) {
       auditCtx.put(AuditConstants.PARAM_TASK_ATTEMPT_ID, taskAttemptId);
     } else {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
index c93d2d8..8ac3dcb 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
@@ -40,7 +40,6 @@
 import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.util.JsonSerialization;
 import org.apache.hadoop.util.Preconditions;
@@ -157,12 +156,7 @@
     this.commitOperations = commitOperations;
     this.jobContext = jobContext;
     this.conf = jobContext.getConfiguration();
-    JobID contextJobID = jobContext.getJobID();
-    // either the job ID or make one up as it will be
-    // used for the filename of any reports.
-    this.jobId = contextJobID != null
-        ? contextJobID.toString()
-        : ("job-without-id-at-" + System.currentTimeMillis());
+    this.jobId = jobContext.getJobID().toString();
     this.collectIOStatistics = conf.getBoolean(
         S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS,
         S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT);