| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce.v2.hs; |
| |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.regex.Pattern; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.mapreduce.JobID; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.TypeConverter; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobId; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobState; |
| import org.apache.hadoop.mapreduce.v2.app.job.Job; |
| import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; |
| import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; |
| import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.yarn.Clock; |
| import org.apache.hadoop.yarn.ClusterInfo; |
| import org.apache.hadoop.yarn.YarnException; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.service.AbstractService; |
| import org.apache.hadoop.yarn.service.Service; |
| |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| /** |
| * Loads and manages the Job history cache. |
| */ |
| public class JobHistory extends AbstractService implements HistoryContext { |
| private static final Log LOG = LogFactory.getLog(JobHistory.class); |
| |
| public static final Pattern CONF_FILENAME_REGEX = Pattern.compile("(" |
| + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?"); |
| public static final String OLD_SUFFIX = ".old"; |
| |
| // Time interval for the move thread. |
| private long moveThreadInterval; |
| |
| private Configuration conf; |
| |
| private ScheduledThreadPoolExecutor scheduledExecutor = null; |
| |
| private HistoryStorage storage = null; |
| private HistoryFileManager hsManager = null; |
| |
| @Override |
| public void init(Configuration conf) throws YarnException { |
| LOG.info("JobHistory Init"); |
| this.conf = conf; |
| this.appID = RecordFactoryProvider.getRecordFactory(conf) |
| .newRecordInstance(ApplicationId.class); |
| this.appAttemptID = RecordFactoryProvider.getRecordFactory(conf) |
| .newRecordInstance(ApplicationAttemptId.class); |
| |
| moveThreadInterval = conf.getLong( |
| JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS, |
| JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS); |
| |
| hsManager = new HistoryFileManager(); |
| hsManager.init(conf); |
| try { |
| hsManager.initExisting(); |
| } catch (IOException e) { |
| throw new YarnException("Failed to intialize existing directories", e); |
| } |
| |
| storage = ReflectionUtils.newInstance(conf.getClass( |
| JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class, |
| HistoryStorage.class), conf); |
| if (storage instanceof Service) { |
| ((Service) storage).init(conf); |
| } |
| storage.setHistoryFileManager(hsManager); |
| |
| super.init(conf); |
| } |
| |
| @Override |
| public void start() { |
| hsManager.start(); |
| if (storage instanceof Service) { |
| ((Service) storage).start(); |
| } |
| |
| scheduledExecutor = new ScheduledThreadPoolExecutor(2, |
| new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d") |
| .build()); |
| |
| scheduledExecutor.scheduleAtFixedRate(new MoveIntermediateToDoneRunnable(), |
| moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS); |
| |
| // Start historyCleaner |
| boolean startCleanerService = conf.getBoolean( |
| JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true); |
| if (startCleanerService) { |
| long runInterval = conf.getLong( |
| JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, |
| JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS); |
| scheduledExecutor |
| .scheduleAtFixedRate(new HistoryCleaner(), |
| 30 * 1000l, runInterval, TimeUnit.MILLISECONDS); |
| } |
| super.start(); |
| } |
| |
| @Override |
| public void stop() { |
| LOG.info("Stopping JobHistory"); |
| if (scheduledExecutor != null) { |
| LOG.info("Stopping History Cleaner/Move To Done"); |
| scheduledExecutor.shutdown(); |
| boolean interrupted = false; |
| long currentTime = System.currentTimeMillis(); |
| while (!scheduledExecutor.isShutdown() |
| && System.currentTimeMillis() > currentTime + 1000l && !interrupted) { |
| try { |
| Thread.sleep(20); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| } |
| } |
| if (!scheduledExecutor.isShutdown()) { |
| LOG.warn("HistoryCleanerService/move to done shutdown may not have " + |
| "succeeded, Forcing a shutdown"); |
| scheduledExecutor.shutdownNow(); |
| } |
| } |
| if (storage instanceof Service) { |
| ((Service) storage).stop(); |
| } |
| hsManager.stop(); |
| super.stop(); |
| } |
| |
| public JobHistory() { |
| super(JobHistory.class.getName()); |
| } |
| |
| @Override |
| public String getApplicationName() { |
| return "Job History Server"; |
| } |
| |
| private class MoveIntermediateToDoneRunnable implements Runnable { |
| @Override |
| public void run() { |
| try { |
| LOG.info("Starting scan to move intermediate done files"); |
| hsManager.scanIntermediateDirectory(); |
| } catch (IOException e) { |
| LOG.error("Error while scanning intermediate done dir ", e); |
| } |
| } |
| } |
| |
| private class HistoryCleaner implements Runnable { |
| public void run() { |
| LOG.info("History Cleaner started"); |
| try { |
| hsManager.clean(); |
| } catch (IOException e) { |
| LOG.warn("Error trying to clean up ", e); |
| } |
| LOG.info("History Cleaner complete"); |
| } |
| } |
| |
| /** |
| * Helper method for test cases. |
| */ |
| HistoryFileInfo getJobFileInfo(JobId jobId) throws IOException { |
| return hsManager.getFileInfo(jobId); |
| } |
| |
| @Override |
| public Job getJob(JobId jobId) { |
| return storage.getFullJob(jobId); |
| } |
| |
| @Override |
| public Map<JobId, Job> getAllJobs(ApplicationId appID) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Called getAllJobs(AppId): " + appID); |
| } |
| // currently there is 1 to 1 mapping between app and job id |
| org.apache.hadoop.mapreduce.JobID oldJobID = TypeConverter.fromYarn(appID); |
| Map<JobId, Job> jobs = new HashMap<JobId, Job>(); |
| JobId jobID = TypeConverter.toYarn(oldJobID); |
| jobs.put(jobID, getJob(jobID)); |
| return jobs; |
| } |
| |
| @Override |
| public Map<JobId, Job> getAllJobs() { |
| return storage.getAllPartialJobs(); |
| } |
| |
| /** |
| * Look for a set of partial jobs. |
| * |
| * @param offset |
| * the offset into the list of jobs. |
| * @param count |
| * the maximum number of jobs to return. |
| * @param user |
| * only return jobs for the given user. |
| * @param queue |
| * only return jobs for in the given queue. |
| * @param sBegin |
| * only return Jobs that started on or after the given time. |
| * @param sEnd |
| * only return Jobs that started on or before the given time. |
| * @param fBegin |
| * only return Jobs that ended on or after the given time. |
| * @param fEnd |
| * only return Jobs that ended on or before the given time. |
| * @param jobState |
| * only return jobs that are in the give job state. |
| * @return The list of filtered jobs. |
| */ |
| @Override |
| public JobsInfo getPartialJobs(Long offset, Long count, String user, |
| String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, |
| JobState jobState) { |
| return storage.getPartialJobs(offset, count, user, queue, sBegin, sEnd, |
| fBegin, fEnd, jobState); |
| } |
| |
| // TODO AppContext - Not Required |
| private ApplicationAttemptId appAttemptID; |
| |
| @Override |
| public ApplicationAttemptId getApplicationAttemptId() { |
| // TODO fixme - bogus appAttemptID for now |
| return appAttemptID; |
| } |
| |
| // TODO AppContext - Not Required |
| private ApplicationId appID; |
| |
| @Override |
| public ApplicationId getApplicationID() { |
| // TODO fixme - bogus appID for now |
| return appID; |
| } |
| |
| // TODO AppContext - Not Required |
| @Override |
| public EventHandler getEventHandler() { |
| // TODO Auto-generated method stub |
| return null; |
| } |
| |
| // TODO AppContext - Not Required |
| private String userName; |
| |
| @Override |
| public CharSequence getUser() { |
| if (userName != null) { |
| userName = conf.get(MRJobConfig.USER_NAME, "history-user"); |
| } |
| return userName; |
| } |
| |
| // TODO AppContext - Not Required |
| @Override |
| public Clock getClock() { |
| return null; |
| } |
| |
| // TODO AppContext - Not Required |
| @Override |
| public ClusterInfo getClusterInfo() { |
| return null; |
| } |
| } |