| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p/> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p/> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapred.gridmix; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.mapred.ClusterStatus; |
| import org.apache.hadoop.mapred.JobClient; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.gridmix.Gridmix.Component; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.JobStatus; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.tools.rumen.JobStory; |
| |
| import java.io.IOException; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| /** |
| * Component collecting the stats required by other components |
| * to make decisions. |
| * Single thread collector tries to collect the stats (currently cluster stats) |
| * and caches it internally. |
| * Components interested in these stats need to register themselves and will get |
| * notified either on every job completion event or some fixed time interval. |
| */ |
| public class Statistics implements Component<Statistics.JobStats> { |
| public static final Log LOG = LogFactory.getLog(Statistics.class); |
| |
| private final StatCollector statistics = new StatCollector(); |
| private JobClient cluster; |
| |
| //List of cluster status listeners. |
| private final List<StatListener<ClusterStats>> clusterStatlisteners = |
| new CopyOnWriteArrayList<StatListener<ClusterStats>>(); |
| |
| //List of job status listeners. |
| private final List<StatListener<JobStats>> jobStatListeners = |
| new CopyOnWriteArrayList<StatListener<JobStats>>(); |
| |
| // A map of job-sequence-id to job-stats of submitted jobs |
| private static final Map<Integer, JobStats> submittedJobsMap = |
| new ConcurrentHashMap<Integer, JobStats>(); |
| |
| // total number of map tasks submitted |
| private static volatile int numMapsSubmitted = 0; |
| |
| // total number of reduce tasks submitted |
| private static volatile int numReducesSubmitted = 0; |
| |
| private int completedJobsInCurrentInterval = 0; |
| private final int jtPollingInterval; |
| private volatile boolean shutdown = false; |
| private final int maxJobCompletedInInterval; |
| private static final String MAX_JOBS_COMPLETED_IN_POLL_INTERVAL_KEY = |
| "gridmix.max-jobs-completed-in-poll-interval"; |
| private final ReentrantLock lock = new ReentrantLock(); |
| private final Condition jobCompleted = lock.newCondition(); |
| private final CountDownLatch startFlag; |
| |
| public Statistics( |
| final Configuration conf, int pollingInterval, CountDownLatch startFlag) |
| throws IOException, InterruptedException { |
| UserGroupInformation ugi = UserGroupInformation.getLoginUser(); |
| this.cluster = ugi.doAs(new PrivilegedExceptionAction<JobClient>() { |
| public JobClient run() throws IOException { |
| return new JobClient(new JobConf(conf)); |
| } |
| }); |
| |
| this.jtPollingInterval = pollingInterval; |
| maxJobCompletedInInterval = conf.getInt( |
| MAX_JOBS_COMPLETED_IN_POLL_INTERVAL_KEY, 1); |
| this.startFlag = startFlag; |
| } |
| |
| /** |
| * Generates a job stats. |
| */ |
| public static JobStats generateJobStats(Job job, JobStory jobdesc) { |
| int seq = GridmixJob.getJobSeqId(job); |
| // bail out if job description is missing for a job to be simulated |
| if (seq >= 0 && jobdesc == null) { |
| throw new IllegalArgumentException("JobStory not available for job " |
| + job.getJobID()); |
| } |
| |
| int maps = -1; |
| int reds = -1; |
| if (jobdesc != null) { |
| // Note that the ZombieJob will return a >= 0 value |
| maps = jobdesc.getNumberMaps(); |
| reds = jobdesc.getNumberReduces(); |
| } |
| return new JobStats(maps, reds, job); |
| } |
| |
| /** |
| * Add a submitted job for monitoring. |
| */ |
| public void addJobStats(JobStats stats) { |
| int seq = GridmixJob.getJobSeqId(stats.getJob()); |
| if (seq < 0) { |
| LOG.info("Not tracking job " + stats.getJob().getJobName() |
| + " as seq id is less than zero: " + seq); |
| return; |
| } |
| submittedJobsMap.put(seq, stats); |
| numMapsSubmitted += stats.getNoOfMaps(); |
| numReducesSubmitted += stats.getNoOfReds(); |
| } |
| |
| /** |
| * Used by JobMonitor to add the completed job. |
| */ |
| @Override |
| public void add(Statistics.JobStats job) { |
| //This thread will be notified initially by job-monitor incase of |
| //data generation. Ignore that as we are getting once the input is |
| //generated. |
| if (!statistics.isAlive()) { |
| return; |
| } |
| JobStats stat = submittedJobsMap.remove(GridmixJob.getJobSeqId(job.getJob())); |
| |
| // stat cannot be null |
| if (stat == null) { |
| LOG.error("[Statistics] Missing entry for job " |
| + job.getJob().getJobID()); |
| return; |
| } |
| |
| // update the total number of submitted map/reduce task count |
| numMapsSubmitted -= stat.getNoOfMaps(); |
| numReducesSubmitted -= stat.getNoOfReds(); |
| |
| completedJobsInCurrentInterval++; |
| //check if we have reached the maximum level of job completions. |
| if (completedJobsInCurrentInterval >= maxJobCompletedInInterval) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug( |
| " Reached maximum limit of jobs in a polling interval " + |
| completedJobsInCurrentInterval); |
| } |
| completedJobsInCurrentInterval = 0; |
| lock.lock(); |
| try { |
| //Job is completed notify all the listeners. |
| for (StatListener<JobStats> l : jobStatListeners) { |
| l.update(stat); |
| } |
| this.jobCompleted.signalAll(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| } |
| |
| //TODO: We have just 2 types of listeners as of now . If no of listeners |
| //increase then we should move to map kind of model. |
| |
| public void addClusterStatsObservers(StatListener<ClusterStats> listener) { |
| clusterStatlisteners.add(listener); |
| } |
| |
| public void addJobStatsListeners(StatListener<JobStats> listener) { |
| this.jobStatListeners.add(listener); |
| } |
| |
| /** |
| * Attempt to start the service. |
| */ |
| @Override |
| public void start() { |
| statistics.start(); |
| } |
| |
| private class StatCollector extends Thread { |
| |
| StatCollector() { |
| super("StatsCollectorThread"); |
| } |
| |
| public void run() { |
| try { |
| startFlag.await(); |
| if (Thread.currentThread().isInterrupted()) { |
| return; |
| } |
| } catch (InterruptedException ie) { |
| LOG.error( |
| "Statistics Error while waiting for other threads to get ready ", ie); |
| return; |
| } |
| while (!shutdown) { |
| lock.lock(); |
| try { |
| jobCompleted.await(jtPollingInterval, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException ie) { |
| if (!shutdown) { |
| LOG.error("Statistics interrupt while waiting for completion of " |
| + "a job.", ie); |
| } |
| return; |
| } finally { |
| lock.unlock(); |
| } |
| |
| //Fetch cluster data only if required.i.e . |
| // only if there are clusterStats listener. |
| if (clusterStatlisteners.size() > 0) { |
| try { |
| ClusterStatus clusterStatus = cluster.getClusterStatus(); |
| updateAndNotifyClusterStatsListeners(clusterStatus); |
| } catch (IOException e) { |
| LOG.error( |
| "Statistics io exception while polling JT ", e); |
| return; |
| } |
| } |
| } |
| } |
| |
| private void updateAndNotifyClusterStatsListeners( |
| ClusterStatus clusterStatus) { |
| ClusterStats stats = ClusterStats.getClusterStats(); |
| stats.setClusterMetric(clusterStatus); |
| for (StatListener<ClusterStats> listener : clusterStatlisteners) { |
| listener.update(stats); |
| } |
| } |
| |
| } |
| |
| /** |
| * Wait until the service completes. It is assumed that either a |
| * {@link #shutdown} or {@link #abort} has been requested. |
| */ |
| @Override |
| public void join(long millis) throws InterruptedException { |
| statistics.join(millis); |
| } |
| |
| @Override |
| public void shutdown() { |
| shutdown = true; |
| submittedJobsMap.clear(); |
| clusterStatlisteners.clear(); |
| jobStatListeners.clear(); |
| statistics.interrupt(); |
| } |
| |
| @Override |
| public void abort() { |
| shutdown = true; |
| submittedJobsMap.clear(); |
| clusterStatlisteners.clear(); |
| jobStatListeners.clear(); |
| statistics.interrupt(); |
| } |
| |
| /** |
| * Class to encapsulate the JobStats information. |
| * Current we just need information about completedJob. |
| * TODO: In future we need to extend this to send more information. |
| */ |
| static class JobStats { |
| private final int noOfMaps; |
| private final int noOfReds; |
| private JobStatus currentStatus; |
| private final Job job; |
| |
| public JobStats(int noOfMaps,int numOfReds, Job job){ |
| this.job = job; |
| this.noOfMaps = noOfMaps; |
| this.noOfReds = numOfReds; |
| } |
| public int getNoOfMaps() { |
| return noOfMaps; |
| } |
| public int getNoOfReds() { |
| return noOfReds; |
| } |
| |
| /** |
| * Returns the job , |
| * We should not use job.getJobID it returns null in 20.1xx. |
| * Use (GridmixJob.getJobSeqId(job)) instead |
| * @return job |
| */ |
| public Job getJob() { |
| return job; |
| } |
| |
| /** |
| * Update the job statistics. |
| */ |
| public synchronized void updateJobStatus(JobStatus status) { |
| this.currentStatus = status; |
| } |
| |
| /** |
| * Get the current job status. |
| */ |
| public synchronized JobStatus getJobStatus() { |
| return currentStatus; |
| } |
| } |
| |
| static class ClusterStats { |
| private ClusterStatus status = null; |
| private static ClusterStats stats = new ClusterStats(); |
| |
| private ClusterStats() { |
| |
| } |
| |
| /** |
| * @return stats |
| */ |
| static ClusterStats getClusterStats() { |
| return stats; |
| } |
| |
| /** |
| * @param metrics |
| */ |
| void setClusterMetric(ClusterStatus metrics) { |
| this.status = metrics; |
| } |
| |
| /** |
| * @return metrics |
| */ |
| public ClusterStatus getStatus() { |
| return status; |
| } |
| |
| int getNumRunningJob() { |
| return submittedJobsMap.size(); |
| } |
| |
| /** |
| * @return runningWatitingJobs |
| */ |
| static Collection<JobStats> getRunningJobStats() { |
| return submittedJobsMap.values(); |
| } |
| |
| /** |
| * Returns the total number of submitted map tasks |
| */ |
| static int getSubmittedMapTasks() { |
| return numMapsSubmitted; |
| } |
| |
| /** |
| * Returns the total number of submitted reduce tasks |
| */ |
| static int getSubmittedReduceTasks() { |
| return numReducesSubmitted; |
| } |
| } |
| } |