Revert "HADOOP-18402. S3A committer NPE in spark job abort (#4735)"
(managed to commit through the github ui before I'd got the message done)
This reverts commit ad83e95046b92540055f7caecf652c455ed2daf9.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java
index 17f63e6..20024ba 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java
@@ -22,7 +22,6 @@
import org.apache.hadoop.fs.audit.CommonAuditContext;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -50,17 +49,12 @@
* @param jobContext job/task context.
*/
public AuditContextUpdater(final JobContext jobContext) {
- JobID contextJobID = jobContext.getJobID();
- this.jobId = contextJobID != null
- ? contextJobID.toString()
- : null;
+ this.jobId = jobContext.getJobID().toString();
if (jobContext instanceof TaskAttemptContext) {
// it's a task, extract info for auditing
final TaskAttemptID tid = ((TaskAttemptContext) jobContext).getTaskAttemptID();
- this.taskAttemptId = tid != null
- ? tid.toString()
- : null;
+ this.taskAttemptId = tid.toString();
} else {
this.taskAttemptId = null;
}
@@ -76,11 +70,7 @@
*/
public void updateCurrentAuditContext() {
final CommonAuditContext auditCtx = currentAuditContext();
- if (jobId != null) {
- auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
- } else {
- currentAuditContext().remove(AuditConstants.PARAM_JOB_ID);
- }
+ auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
if (taskAttemptId != null) {
auditCtx.put(AuditConstants.PARAM_TASK_ATTEMPT_ID, taskAttemptId);
} else {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
index c93d2d8..8ac3dcb 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
@@ -40,7 +40,6 @@
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.JsonSerialization;
import org.apache.hadoop.util.Preconditions;
@@ -157,12 +156,7 @@
this.commitOperations = commitOperations;
this.jobContext = jobContext;
this.conf = jobContext.getConfiguration();
- JobID contextJobID = jobContext.getJobID();
- // either the job ID or make one up as it will be
- // used for the filename of any reports.
- this.jobId = contextJobID != null
- ? contextJobID.toString()
- : ("job-without-id-at-" + System.currentTimeMillis());
+ this.jobId = jobContext.getJobID().toString();
this.collectIOStatistics = conf.getBoolean(
S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS,
S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT);