| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce.v2.hs; |
| |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.LinkedHashMap; |
| import java.util.Map; |
| import java.util.SortedMap; |
| import java.util.TreeMap; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobId; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobReport; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobState; |
| import org.apache.hadoop.mapreduce.v2.app.job.Job; |
| import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; |
| import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; |
| import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo; |
| import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; |
| import org.apache.hadoop.yarn.YarnException; |
| import org.apache.hadoop.yarn.service.AbstractService; |
| |
| /** |
| * Manages an in memory cache of parsed Job History files. |
| */ |
| public class CachedHistoryStorage extends AbstractService implements |
| HistoryStorage { |
| private static final Log LOG = LogFactory.getLog(CachedHistoryStorage.class); |
| |
| private Map<JobId, Job> loadedJobCache = null; |
| // The number of loaded jobs. |
| private int loadedJobCacheSize; |
| |
| private HistoryFileManager hsManager; |
| |
| @Override |
| public void setHistoryFileManager(HistoryFileManager hsManager) { |
| this.hsManager = hsManager; |
| } |
| |
| @SuppressWarnings("serial") |
| @Override |
| public void init(Configuration conf) throws YarnException { |
| LOG.info("CachedHistoryStorage Init"); |
| |
| loadedJobCacheSize = conf.getInt( |
| JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, |
| JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE); |
| |
| loadedJobCache = Collections.synchronizedMap(new LinkedHashMap<JobId, Job>( |
| loadedJobCacheSize + 1, 0.75f, true) { |
| @Override |
| public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) { |
| return super.size() > loadedJobCacheSize; |
| } |
| }); |
| |
| super.init(conf); |
| } |
| |
| public CachedHistoryStorage() { |
| super(CachedHistoryStorage.class.getName()); |
| } |
| |
| private Job loadJob(HistoryFileInfo fileInfo) { |
| try { |
| Job job = fileInfo.loadJob(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Adding " + job.getID() + " to loaded job cache"); |
| } |
| // We can clobber results here, but that should be OK, because it only |
| // means that we may have two identical copies of the same job floating |
| // around for a while. |
| loadedJobCache.put(job.getID(), job); |
| return job; |
| } catch (IOException e) { |
| throw new YarnException( |
| "Could not find/load job: " + fileInfo.getJobId(), e); |
| } |
| } |
| |
| @Override |
| public Job getFullJob(JobId jobId) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Looking for Job " + jobId); |
| } |
| try { |
| HistoryFileInfo fileInfo = hsManager.getFileInfo(jobId); |
| Job result = null; |
| if (fileInfo != null) { |
| result = loadedJobCache.get(jobId); |
| if (result == null) { |
| result = loadJob(fileInfo); |
| } else if(fileInfo.isDeleted()) { |
| loadedJobCache.remove(jobId); |
| result = null; |
| } |
| } else { |
| loadedJobCache.remove(jobId); |
| } |
| return result; |
| } catch (IOException e) { |
| throw new YarnException(e); |
| } |
| } |
| |
| @Override |
| public Map<JobId, Job> getAllPartialJobs() { |
| LOG.debug("Called getAllPartialJobs()"); |
| SortedMap<JobId, Job> result = new TreeMap<JobId, Job>(); |
| try { |
| for (HistoryFileInfo mi : hsManager.getAllFileInfo()) { |
| if (mi != null) { |
| JobId id = mi.getJobId(); |
| result.put(id, new PartialJob(mi.getJobIndexInfo(), id)); |
| } |
| } |
| } catch (IOException e) { |
| LOG.warn("Error trying to scan for all FileInfos", e); |
| throw new YarnException(e); |
| } |
| return result; |
| } |
| |
| @Override |
| public JobsInfo getPartialJobs(Long offset, Long count, String user, |
| String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, |
| JobState jobState) { |
| return getPartialJobs(getAllPartialJobs().values(), offset, count, user, |
| queue, sBegin, sEnd, fBegin, fEnd, jobState); |
| } |
| |
| public static JobsInfo getPartialJobs(Collection<Job> jobs, Long offset, |
| Long count, String user, String queue, Long sBegin, Long sEnd, |
| Long fBegin, Long fEnd, JobState jobState) { |
| JobsInfo allJobs = new JobsInfo(); |
| |
| if (sBegin == null || sBegin < 0) |
| sBegin = 0l; |
| if (sEnd == null) |
| sEnd = Long.MAX_VALUE; |
| if (fBegin == null || fBegin < 0) |
| fBegin = 0l; |
| if (fEnd == null) |
| fEnd = Long.MAX_VALUE; |
| if (offset == null || offset < 0) |
| offset = 0l; |
| if (count == null) |
| count = Long.MAX_VALUE; |
| |
| if (offset > jobs.size()) { |
| return allJobs; |
| } |
| |
| long at = 0; |
| long end = offset + count - 1; |
| if (end < 0) { // due to overflow |
| end = Long.MAX_VALUE; |
| } |
| |
| for (Job job : jobs) { |
| if (at > end) { |
| break; |
| } |
| |
| // can't really validate queue is a valid one since queues could change |
| if (queue != null && !queue.isEmpty()) { |
| if (!job.getQueueName().equals(queue)) { |
| continue; |
| } |
| } |
| |
| if (user != null && !user.isEmpty()) { |
| if (!job.getUserName().equals(user)) { |
| continue; |
| } |
| } |
| |
| JobReport report = job.getReport(); |
| |
| if (report.getStartTime() < sBegin || report.getStartTime() > sEnd) { |
| continue; |
| } |
| if (report.getFinishTime() < fBegin || report.getFinishTime() > fEnd) { |
| continue; |
| } |
| if (jobState != null && jobState != report.getJobState()) { |
| continue; |
| } |
| |
| at++; |
| if ((at - 1) < offset) { |
| continue; |
| } |
| |
| JobInfo jobInfo = new JobInfo(job); |
| |
| allJobs.add(jobInfo); |
| } |
| return allJobs; |
| } |
| } |