blob: 37efbe1a73c01701dd2bc51ecf0a08bfe4d97eae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
/**
* Reads in history events from the JobHistoryFile and sends them out again
* to be recorded.
*/
public class JobHistoryCopyService extends CompositeService implements HistoryEventHandler {
private static final Log LOG = LogFactory.getLog(JobHistoryCopyService.class);
private final ApplicationAttemptId applicationAttemptId;
private final EventHandler handler;
private final JobId jobId;
public JobHistoryCopyService(ApplicationAttemptId applicationAttemptId,
EventHandler handler) {
super("JobHistoryCopyService");
this.applicationAttemptId = applicationAttemptId;
this.jobId = TypeConverter.toYarn(
TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
this.handler = handler;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
}
@Override
public void handleEvent(HistoryEvent event) throws IOException {
//Skip over the AM Events this is handled elsewhere
if (!(event instanceof AMStartedEvent)) {
handler.handle(new JobHistoryEvent(jobId, event));
}
}
@Override
protected void serviceStart() throws Exception {
try {
//TODO should we parse on a background thread???
parse();
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
super.serviceStart();
}
private void parse() throws IOException {
FSDataInputStream in = null;
try {
in = getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId);
} catch (IOException e) {
LOG.warn("error trying to open previous history file. No history data " +
"will be copied over.", e);
return;
}
JobHistoryParser parser = new JobHistoryParser(in);
parser.parse(this);
Exception parseException = parser.getParseException();
if (parseException != null) {
LOG.info("Got an error parsing job-history file" +
", ignoring incomplete events.", parseException);
}
}
public static FSDataInputStream getPreviousJobHistoryFileStream(
Configuration conf, ApplicationAttemptId applicationAttemptId)
throws IOException {
FSDataInputStream in = null;
Path historyFile = null;
String jobId =
TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
.toString();
String jobhistoryDir =
JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
Path histDirPath =
FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
// read the previous history file
historyFile =
fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
jobId, (applicationAttemptId.getAttemptId() - 1)));
LOG.info("History file is at " + historyFile);
in = fc.open(historyFile);
return in;
}
}