| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapred; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.security.PrivilegedExceptionAction; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; |
| import org.apache.hadoop.mapreduce.MRConfig; |
| import org.apache.hadoop.mapreduce.TaskType; |
| import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; |
| import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; |
| import org.apache.hadoop.net.DNSToSwitchMapping; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.net.NetworkTopology; |
| import org.apache.hadoop.net.StaticMapping; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.security.UserGroupInformation; |
| |
| /** |
| * This class creates a single-process Map-Reduce cluster for junit testing. |
| * One thread is created for each server. |
| */ |
| public class MiniMRCluster { |
| private static final Log LOG = LogFactory.getLog(MiniMRCluster.class); |
| |
| private Thread jobTrackerThread; |
| private JobTrackerRunner jobTracker; |
| |
| private int jobTrackerPort = 0; |
| private int taskTrackerPort = 0; |
| private int jobTrackerInfoPort = 0; |
| private int numTaskTrackers; |
| |
| private List<TaskTrackerRunner> taskTrackerList = new ArrayList<TaskTrackerRunner>(); |
| private List<Thread> taskTrackerThreadList = new ArrayList<Thread>(); |
| |
| private String namenode; |
| private UserGroupInformation ugi = null; |
| private JobConf conf; |
| private int numTrackerToExclude; |
| |
| private JobConf job; |
| private Clock clock; |
| |
| /** |
| * An inner class that runs a job tracker. |
| */ |
| public class JobTrackerRunner implements Runnable { |
| private JobTracker tracker = null; |
| private volatile boolean isActive = true; |
| |
| JobConf jc = null; |
| Clock clock = JobTracker.DEFAULT_CLOCK; |
| |
| public JobTrackerRunner(JobConf conf) { |
| jc = conf; |
| } |
| |
| public JobTrackerRunner(JobConf conf, Clock clock) { |
| jc = conf; |
| this.clock = clock; |
| } |
| |
| public boolean isUp() { |
| return (tracker != null); |
| } |
| |
| public boolean isActive() { |
| return isActive; |
| } |
| |
| public int getJobTrackerPort() { |
| return tracker.getTrackerPort(); |
| } |
| |
| public int getJobTrackerInfoPort() { |
| return tracker.getInfoPort(); |
| } |
| |
| public JobTracker getJobTracker() { |
| return tracker; |
| } |
| |
| /** |
| * Create the job tracker and run it. |
| */ |
| public void run() { |
| try { |
| jc = (jc == null) ? createJobConf() : createJobConf(jc); |
| File f = new File("build/test/mapred/local").getAbsoluteFile(); |
| jc.set(MRConfig.LOCAL_DIR, f.getAbsolutePath()); |
| jc.setClass("topology.node.switch.mapping.impl", |
| StaticMapping.class, DNSToSwitchMapping.class); |
| final String id = |
| new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date()); |
| if (ugi == null) { |
| ugi = UserGroupInformation.getLoginUser(); |
| } |
| tracker = ugi.doAs(new PrivilegedExceptionAction<JobTracker>() { |
| public JobTracker run() throws InterruptedException, IOException { |
| return JobTracker.startTracker(jc, clock, id); |
| } |
| }); |
| tracker.offerService(); |
| } catch (Throwable e) { |
| LOG.error("Job tracker crashed", e); |
| isActive = false; |
| } |
| } |
| |
| /** |
| * Shutdown the job tracker and wait for it to finish. |
| */ |
| public void shutdown() { |
| try { |
| if (tracker != null) { |
| tracker.stopTracker(); |
| } |
| } catch (Throwable e) { |
| LOG.error("Problem shutting down job tracker", e); |
| } |
| isActive = false; |
| } |
| } |
| |
| /** |
| * An inner class to run the task tracker. |
| */ |
| class TaskTrackerRunner implements Runnable { |
| volatile TaskTracker tt; |
| int trackerId; |
| // the localDirs for this taskTracker |
| String[] localDirs; |
| volatile boolean isInitialized = false; |
| volatile boolean isDead = false; |
| int numDir; |
| |
| TaskTrackerRunner(int trackerId, int numDir, String hostname, |
| JobConf cfg) |
| throws IOException { |
| this.trackerId = trackerId; |
| this.numDir = numDir; |
| localDirs = new String[numDir]; |
| final JobConf conf; |
| if (cfg == null) { |
| conf = createJobConf(); |
| } else { |
| conf = createJobConf(cfg); |
| } |
| if (hostname != null) { |
| conf.set(TTConfig.TT_HOST_NAME, hostname); |
| } |
| conf.set(TTConfig.TT_HTTP_ADDRESS, "0.0.0.0:0"); |
| conf.set(TTConfig.TT_REPORT_ADDRESS, |
| "127.0.0.1:" + taskTrackerPort); |
| File localDirBase = |
| new File(conf.get(MRConfig.LOCAL_DIR)).getAbsoluteFile(); |
| localDirBase.mkdirs(); |
| StringBuffer localPath = new StringBuffer(); |
| for(int i=0; i < numDir; ++i) { |
| File ttDir = new File(localDirBase, |
| Integer.toString(trackerId) + "_" + i); |
| if (!ttDir.mkdirs()) { |
| if (!ttDir.isDirectory()) { |
| throw new IOException("Mkdirs failed to create " + ttDir); |
| } |
| } |
| localDirs[i] = ttDir.toString(); |
| if (i != 0) { |
| localPath.append(","); |
| } |
| localPath.append(localDirs[i]); |
| } |
| conf.set(MRConfig.LOCAL_DIR, localPath.toString()); |
| LOG.info(MRConfig.LOCAL_DIR + " is " + localPath); |
| try { |
| tt = ugi.doAs(new PrivilegedExceptionAction<TaskTracker>() { |
| public TaskTracker run() throws InterruptedException, IOException { |
| return createTaskTracker(conf); |
| } |
| }); |
| isInitialized = true; |
| } catch (Throwable e) { |
| isDead = true; |
| tt = null; |
| LOG.error("task tracker " + trackerId + " crashed", e); |
| } |
| } |
| |
| /** |
| * Creates a default {@link TaskTracker} using the conf passed. |
| */ |
| TaskTracker createTaskTracker(JobConf conf) |
| throws IOException, InterruptedException { |
| return new TaskTracker(conf); |
| } |
| |
| /** |
| * Create and run the task tracker. |
| */ |
| public void run() { |
| try { |
| if (tt != null) { |
| tt.run(); |
| } |
| } catch (Throwable e) { |
| isDead = true; |
| tt = null; |
| LOG.error("task tracker " + trackerId + " crashed", e); |
| } |
| } |
| |
| /** |
| * Get the local dir for this TaskTracker. |
| * This is there so that we do not break |
| * previous tests. |
| * @return the absolute pathname |
| */ |
| public String getLocalDir() { |
| return localDirs[0]; |
| } |
| |
| public String[] getLocalDirs(){ |
| return localDirs; |
| } |
| |
| public TaskTracker getTaskTracker() { |
| return tt; |
| } |
| |
| /** |
| * Shut down the server and wait for it to finish. |
| */ |
| public void shutdown() { |
| if (tt != null) { |
| try { |
| tt.shutdown(); |
| } catch (Throwable e) { |
| LOG.error("task tracker " + trackerId + " could not shut down", |
| e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Get the local directory for the Nth task tracker |
| * @param taskTracker the index of the task tracker to check |
| * @return the absolute pathname of the local dir |
| */ |
| public String getTaskTrackerLocalDir(int taskTracker) { |
| return (taskTrackerList.get(taskTracker)).getLocalDir(); |
| } |
| |
| /** |
| * Get all the local directories for the Nth task tracker |
| * @param taskTracker the index of the task tracker to check |
| * @return array of local dirs |
| */ |
| public String[] getTaskTrackerLocalDirs(int taskTracker) { |
| return (taskTrackerList.get(taskTracker)).getLocalDirs(); |
| } |
| |
| public JobTrackerRunner getJobTrackerRunner() { |
| return jobTracker; |
| } |
| |
| TaskTrackerRunner getTaskTrackerRunner(int id) { |
| return taskTrackerList.get(id); |
| } |
| /** |
| * Get the number of task trackers in the cluster |
| */ |
| public int getNumTaskTrackers() { |
| return taskTrackerList.size(); |
| } |
| |
| /** |
| * Sets inline cleanup threads to all task trackers sothat deletion of |
| * temporary files/dirs happen inline |
| */ |
| public void setInlineCleanupThreads() { |
| for (int i = 0; i < getNumTaskTrackers(); i++) { |
| getTaskTrackerRunner(i).getTaskTracker().setCleanupThread( |
| new UtilsForTests.InlineCleanupQueue()); |
| } |
| } |
| |
| /** |
| * Wait until the system is idle. |
| */ |
| public void waitUntilIdle() { |
| waitTaskTrackers(); |
| |
| JobClient client; |
| try { |
| client = new JobClient(job); |
| ClusterStatus status = client.getClusterStatus(); |
| while(status.getTaskTrackers() + numTrackerToExclude |
| < taskTrackerList.size()) { |
| for(TaskTrackerRunner runner : taskTrackerList) { |
| if(runner.isDead) { |
| throw new RuntimeException("TaskTracker is dead"); |
| } |
| } |
| Thread.sleep(1000); |
| status = client.getClusterStatus(); |
| } |
| } |
| catch (IOException ex) { |
| throw new RuntimeException(ex); |
| } |
| catch (InterruptedException ex) { |
| throw new RuntimeException(ex); |
| } |
| |
| } |
| |
| private void waitTaskTrackers() { |
| for(Iterator<TaskTrackerRunner> itr= taskTrackerList.iterator(); itr.hasNext();) { |
| TaskTrackerRunner runner = itr.next(); |
| while (!runner.isDead && (!runner.isInitialized || !runner.tt.isIdle())) { |
| if (!runner.isInitialized) { |
| LOG.info("Waiting for task tracker to start."); |
| } else { |
| LOG.info("Waiting for task tracker " + runner.tt.getName() + |
| " to be idle."); |
| } |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException ie) {} |
| } |
| } |
| } |
| |
| /** |
| * Get the actual rpc port used. |
| */ |
| public int getJobTrackerPort() { |
| return jobTrackerPort; |
| } |
| |
| public JobConf createJobConf() { |
| return createJobConf(new JobConf()); |
| } |
| |
| public JobConf createJobConf(JobConf conf) { |
| if(conf == null) { |
| conf = new JobConf(); |
| } |
| return configureJobConf(conf, namenode, jobTrackerPort, jobTrackerInfoPort, |
| ugi); |
| } |
| |
| static JobConf configureJobConf(JobConf conf, String namenode, |
| int jobTrackerPort, int jobTrackerInfoPort, |
| UserGroupInformation ugi) { |
| JobConf result = new JobConf(conf); |
| FileSystem.setDefaultUri(result, namenode); |
| result.set(JTConfig.JT_IPC_ADDRESS, "localhost:"+jobTrackerPort); |
| result.set(JTConfig.JT_HTTP_ADDRESS, |
| "127.0.0.1:" + jobTrackerInfoPort); |
| // for debugging have all task output sent to the test output |
| JobClient.setTaskOutputFilter(result, JobClient.TaskStatusFilter.ALL); |
| return result; |
| } |
| |
| /** |
| * Create the config and the cluster. |
| * @param numTaskTrackers no. of tasktrackers in the cluster |
| * @param namenode the namenode |
| * @param numDir no. of directories |
| * @throws IOException |
| */ |
| public MiniMRCluster(int numTaskTrackers, String namenode, int numDir, |
| String[] racks, String[] hosts) throws IOException { |
| this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts); |
| } |
| |
| /** |
| * Create the config and the cluster. |
| * @param numTaskTrackers no. of tasktrackers in the cluster |
| * @param namenode the namenode |
| * @param numDir no. of directories |
| * @param racks Array of racks |
| * @param hosts Array of hosts in the corresponding racks |
| * @param conf Default conf for the jobtracker |
| * @throws IOException |
| */ |
| public MiniMRCluster(int numTaskTrackers, String namenode, int numDir, |
| String[] racks, String[] hosts, JobConf conf) |
| throws IOException { |
| this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts, null, conf); |
| } |
| |
| /** |
| * Create the config and the cluster. |
| * @param numTaskTrackers no. of tasktrackers in the cluster |
| * @param namenode the namenode |
| * @param numDir no. of directories |
| * @throws IOException |
| */ |
| public MiniMRCluster(int numTaskTrackers, String namenode, int numDir) |
| throws IOException { |
| this(0, 0, numTaskTrackers, namenode, numDir); |
| } |
| |
| public MiniMRCluster(int jobTrackerPort, |
| int taskTrackerPort, |
| int numTaskTrackers, |
| String namenode, |
| int numDir) |
| throws IOException { |
| this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, |
| numDir, null); |
| } |
| |
| public MiniMRCluster(int jobTrackerPort, |
| int taskTrackerPort, |
| int numTaskTrackers, |
| String namenode, |
| int numDir, |
| String[] racks) throws IOException { |
| this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, |
| numDir, racks, null); |
| } |
| |
| public MiniMRCluster(int jobTrackerPort, |
| int taskTrackerPort, |
| int numTaskTrackers, |
| String namenode, |
| int numDir, |
| String[] racks, String[] hosts) throws IOException { |
| this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, |
| numDir, racks, hosts, null); |
| } |
| |
| public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, |
| int numTaskTrackers, String namenode, |
| int numDir, String[] racks, String[] hosts, UserGroupInformation ugi |
| ) throws IOException { |
| this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, |
| numDir, racks, hosts, ugi, null); |
| } |
| |
| public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, |
| int numTaskTrackers, String namenode, |
| int numDir, String[] racks, String[] hosts, UserGroupInformation ugi, |
| JobConf conf) throws IOException { |
| this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir, |
| racks, hosts, ugi, conf, 0); |
| } |
| |
| public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, |
| int numTaskTrackers, String namenode, |
| int numDir, String[] racks, String[] hosts, UserGroupInformation ugi, |
| JobConf conf, int numTrackerToExclude) throws IOException { |
| this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir, |
| racks, hosts, ugi, conf, numTrackerToExclude, new Clock()); |
| } |
| |
| public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, |
| int numTaskTrackers, String namenode, |
| int numDir, String[] racks, String[] hosts, UserGroupInformation ugi, |
| JobConf conf, int numTrackerToExclude, Clock clock) throws IOException { |
| if (racks != null && racks.length < numTaskTrackers) { |
| LOG.error("Invalid number of racks specified. It should be at least " + |
| "equal to the number of tasktrackers"); |
| shutdown(); |
| } |
| if (hosts != null && numTaskTrackers > hosts.length ) { |
| throw new IllegalArgumentException( "The length of hosts [" + hosts.length |
| + "] is less than the number of tasktrackers [" + numTaskTrackers + "]."); |
| } |
| |
| //Generate rack names if required |
| if (racks == null) { |
| System.out.println("Generating rack names for tasktrackers"); |
| racks = new String[numTaskTrackers]; |
| for (int i=0; i < racks.length; ++i) { |
| racks[i] = NetworkTopology.DEFAULT_RACK; |
| } |
| } |
| |
| //Generate some hostnames if required |
| if (hosts == null) { |
| System.out.println("Generating host names for tasktrackers"); |
| hosts = new String[numTaskTrackers]; |
| for (int i = 0; i < numTaskTrackers; i++) { |
| hosts[i] = "host" + i + ".foo.com"; |
| } |
| } |
| this.jobTrackerPort = jobTrackerPort; |
| this.taskTrackerPort = taskTrackerPort; |
| this.jobTrackerInfoPort = 0; |
| this.numTaskTrackers = 0; |
| this.namenode = namenode; |
| this.ugi = ugi; |
| this.conf = conf; // this is the conf the mr starts with |
| this.numTrackerToExclude = numTrackerToExclude; |
| this.clock = clock; |
| |
| // start the jobtracker |
| startJobTracker(); |
| |
| // Create the TaskTrackers |
| for (int idx = 0; idx < numTaskTrackers; idx++) { |
| String rack = null; |
| String host = null; |
| if (racks != null) { |
| rack = racks[idx]; |
| } |
| if (hosts != null) { |
| host = hosts[idx]; |
| } |
| |
| startTaskTracker(host, rack, idx, numDir); |
| } |
| |
| this.job = createJobConf(conf); |
| waitUntilIdle(); |
| } |
| |
| public UserGroupInformation getUgi() { |
| return ugi; |
| } |
| |
| /** |
| * Get the task completion events |
| */ |
| public TaskCompletionEvent[] getTaskCompletionEvents(JobID id, int from, |
| int max) |
| throws IOException { |
| return jobTracker.getJobTracker().getTaskCompletionEvents(id, from, max); |
| } |
| |
| /** |
| * Change the job's priority |
| * |
| * @throws IOException |
| * @throws AccessControlException |
| */ |
| public void setJobPriority(JobID jobId, JobPriority priority) |
| throws AccessControlException, IOException { |
| jobTracker.getJobTracker().setJobPriority(jobId, priority); |
| } |
| |
| /** |
| * Get the job's priority |
| */ |
| public JobPriority getJobPriority(JobID jobId) { |
| return jobTracker.getJobTracker().getJob(jobId).getPriority(); |
| } |
| |
| /** |
| * Get the job finish time |
| */ |
| public long getJobFinishTime(JobID jobId) { |
| return jobTracker.getJobTracker().getJob(jobId).getFinishTime(); |
| } |
| |
| /** |
| * Init the job |
| */ |
| public void initializeJob(JobID jobId) throws IOException { |
| JobInProgress job = jobTracker.getJobTracker().getJob(jobId); |
| jobTracker.getJobTracker().initJob(job); |
| } |
| |
| /** |
| * Get the events list at the tasktracker |
| */ |
| public MapTaskCompletionEventsUpdate |
| getMapTaskCompletionEventsUpdates(int index, JobID jobId, int max) |
| throws IOException { |
| String jtId = jobTracker.getJobTracker().getTrackerIdentifier(); |
| TaskAttemptID dummy = |
| new TaskAttemptID(jtId, jobId.getId(), TaskType.REDUCE, 0, 0); |
| return taskTrackerList.get(index).getTaskTracker() |
| .getMapCompletionEvents(jobId, 0, max, |
| dummy); |
| } |
| |
| /** |
| * Get jobtracker conf |
| */ |
| public JobConf getJobTrackerConf() { |
| return this.conf; |
| } |
| |
| |
| public int getFaultCount(String hostName) { |
| return jobTracker.getJobTracker().getFaultCount(hostName); |
| } |
| |
| /** |
| * Start the jobtracker. |
| */ |
| public void startJobTracker() { |
| startJobTracker(true); |
| } |
| |
| public void startJobTracker(boolean wait) { |
| // Create the JobTracker |
| jobTracker = new JobTrackerRunner(conf, clock); |
| jobTrackerThread = new Thread(jobTracker); |
| |
| jobTrackerThread.start(); |
| |
| if (!wait) { |
| return; |
| } |
| |
| while (jobTracker.isActive() && !jobTracker.isUp()) { |
| try { // let daemons get started |
| Thread.sleep(1000); |
| } catch(InterruptedException e) { |
| } |
| } |
| |
| // is the jobtracker has started then wait for it to init |
| ClusterStatus status = null; |
| if (jobTracker.isUp()) { |
| status = jobTracker.getJobTracker().getClusterStatus(false); |
| while (jobTracker.isActive() && status.getJobTrackerStatus() |
| == JobTrackerStatus.INITIALIZING) { |
| try { |
| LOG.info("JobTracker still initializing. Waiting."); |
| Thread.sleep(1000); |
| } catch(InterruptedException e) {} |
| status = jobTracker.getJobTracker().getClusterStatus(false); |
| } |
| } |
| |
| if (!jobTracker.isActive()) { |
| // return if jobtracker has crashed |
| return; |
| } |
| |
| // Set the configuration for the task-trackers |
| this.jobTrackerPort = jobTracker.getJobTrackerPort(); |
| this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort(); |
| } |
| |
| /** |
| * Kill the jobtracker. |
| */ |
| public void stopJobTracker() { |
| //jobTracker.exit(-1); |
| jobTracker.shutdown(); |
| |
| jobTrackerThread.interrupt(); |
| try { |
| jobTrackerThread.join(); |
| } catch (InterruptedException ex) { |
| LOG.error("Problem waiting for job tracker to finish", ex); |
| } |
| } |
| |
| /** |
| * Kill the tasktracker. |
| */ |
| public void stopTaskTracker(int id) { |
| TaskTrackerRunner tracker = taskTrackerList.remove(id); |
| tracker.shutdown(); |
| |
| Thread thread = taskTrackerThreadList.remove(id); |
| |
| try { |
| thread.join(); |
| // This will break the wait until idle loop |
| tracker.isDead = true; |
| --numTaskTrackers; |
| } catch (InterruptedException ex) { |
| LOG.error("Problem waiting for task tracker to finish", ex); |
| } |
| } |
| |
| /** |
| * Start the tasktracker. |
| */ |
| public void startTaskTracker(String host, String rack, int idx, int numDir) |
| throws IOException { |
| if (rack != null) { |
| StaticMapping.addNodeToRack(host, rack); |
| } |
| if (host != null) { |
| NetUtils.addStaticResolution(host, "localhost"); |
| } |
| TaskTrackerRunner taskTracker; |
| taskTracker = new TaskTrackerRunner(idx, numDir, host, conf); |
| |
| addTaskTracker(taskTracker); |
| } |
| |
| /** |
| * Add a task-tracker to the Mini-MR cluster. |
| */ |
| void addTaskTracker(TaskTrackerRunner taskTracker) { |
| Thread taskTrackerThread = new Thread(taskTracker); |
| taskTrackerList.add(taskTracker); |
| taskTrackerThreadList.add(taskTrackerThread); |
| taskTrackerThread.start(); |
| ++numTaskTrackers; |
| } |
| |
| /** |
| * Get the tasktrackerID in MiniMRCluster with given trackerName. |
| */ |
| int getTaskTrackerID(String trackerName) { |
| for (int id=0; id < numTaskTrackers; id++) { |
| if (taskTrackerList.get(id).getTaskTracker().getName().equals( |
| trackerName)) { |
| return id; |
| } |
| } |
| return -1; |
| } |
| |
| /** |
| * Shut down the servers. |
| */ |
| public void shutdown() { |
| try { |
| waitTaskTrackers(); |
| for (int idx = 0; idx < numTaskTrackers; idx++) { |
| TaskTrackerRunner taskTracker = taskTrackerList.get(idx); |
| Thread taskTrackerThread = taskTrackerThreadList.get(idx); |
| taskTracker.shutdown(); |
| try { |
| taskTrackerThread.join(); |
| } catch (InterruptedException ex) { |
| LOG.error("Problem shutting down task tracker", ex); |
| } |
| } |
| stopJobTracker(); |
| } finally { |
| File configDir = new File("build", "minimr"); |
| File siteFile = new File(configDir, "mapred-site.xml"); |
| siteFile.delete(); |
| } |
| } |
| |
| public static void main(String[] args) throws IOException { |
| LOG.info("Bringing up Jobtracker and tasktrackers."); |
| MiniMRCluster mr = new MiniMRCluster(4, "file:///", 1); |
| LOG.info("JobTracker and TaskTrackers are up."); |
| mr.shutdown(); |
| LOG.info("JobTracker and TaskTrackers brought down."); |
| } |
| } |
| |