| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce.lib.jobcontrol; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapred.HadoopTestCase; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.MapReduceTestUtil; |
| |
| /** |
| * This class performs unit test for Job/JobControl classes. |
| * |
| */ |
| public class TestMapReduceJobControl extends HadoopTestCase { |
| |
| static Path rootDataDir = new Path( |
| System.getProperty("test.build.data", "."), "TestData"); |
| static Path indir = new Path(rootDataDir, "indir"); |
| static Path outdir_1 = new Path(rootDataDir, "outdir_1"); |
| static Path outdir_2 = new Path(rootDataDir, "outdir_2"); |
| static Path outdir_3 = new Path(rootDataDir, "outdir_3"); |
| static Path outdir_4 = new Path(rootDataDir, "outdir_4"); |
| static ControlledJob cjob1 = null; |
| static ControlledJob cjob2 = null; |
| static ControlledJob cjob3 = null; |
| static ControlledJob cjob4 = null; |
| |
| public TestMapReduceJobControl() throws IOException { |
| super(HadoopTestCase.LOCAL_MR , HadoopTestCase.LOCAL_FS, 2, 2); |
| } |
| |
| private void cleanupData(Configuration conf) throws Exception { |
| FileSystem fs = FileSystem.get(conf); |
| MapReduceTestUtil.cleanData(fs, indir); |
| MapReduceTestUtil.generateData(fs, indir); |
| |
| MapReduceTestUtil.cleanData(fs, outdir_1); |
| MapReduceTestUtil.cleanData(fs, outdir_2); |
| MapReduceTestUtil.cleanData(fs, outdir_3); |
| MapReduceTestUtil.cleanData(fs, outdir_4); |
| } |
| |
| /** |
| * This is a main function for testing JobControl class. |
| * It requires 4 jobs: |
| * Job 1: passed as parameter. input:indir output:outdir_1 |
| * Job 2: copy data from indir to outdir_2 |
| * Job 3: copy data from outdir_1 and outdir_2 to outdir_3 |
| * Job 4: copy data from outdir to outdir_4 |
| * The jobs 1 and 2 have no dependency. The job 3 depends on jobs 1 and 2. |
| * The job 4 depends on job 3. |
| * |
| * Then it creates a JobControl object and add the 4 jobs to |
| * the JobControl object. |
| * Finally, it creates a thread to run the JobControl object |
| */ |
| private JobControl createDependencies(Configuration conf, Job job1) |
| throws Exception { |
| List<ControlledJob> dependingJobs = null; |
| cjob1 = new ControlledJob(job1, dependingJobs); |
| Job job2 = MapReduceTestUtil.createCopyJob(conf, outdir_2, indir); |
| cjob2 = new ControlledJob(job2, dependingJobs); |
| |
| Job job3 = MapReduceTestUtil.createCopyJob(conf, outdir_3, |
| outdir_1, outdir_2); |
| dependingJobs = new ArrayList<ControlledJob>(); |
| dependingJobs.add(cjob1); |
| dependingJobs.add(cjob2); |
| cjob3 = new ControlledJob(job3, dependingJobs); |
| |
| Job job4 = MapReduceTestUtil.createCopyJob(conf, outdir_4, outdir_3); |
| dependingJobs = new ArrayList<ControlledJob>(); |
| dependingJobs.add(cjob3); |
| cjob4 = new ControlledJob(job4, dependingJobs); |
| |
| JobControl theControl = new JobControl("Test"); |
| theControl.addJob(cjob1); |
| theControl.addJob(cjob2); |
| theControl.addJob(cjob3); |
| theControl.addJob(cjob4); |
| Thread theController = new Thread(theControl); |
| theController.start(); |
| return theControl; |
| } |
| |
| private void waitTillAllFinished(JobControl theControl) { |
| while (!theControl.allFinished()) { |
| try { |
| Thread.sleep(100); |
| } catch (Exception e) {} |
| } |
| } |
| |
| public void testJobControlWithFailJob() throws Exception { |
| Configuration conf = createJobConf(); |
| |
| cleanupData(conf); |
| |
| // create a Fail job |
| Job job1 = MapReduceTestUtil.createFailJob(conf, outdir_1, indir); |
| |
| // create job dependencies |
| JobControl theControl = createDependencies(conf, job1); |
| |
| // wait till all the jobs complete |
| waitTillAllFinished(theControl); |
| |
| assertTrue(cjob1.getJobState() == ControlledJob.State.FAILED); |
| assertTrue(cjob2.getJobState() == ControlledJob.State.SUCCESS); |
| assertTrue(cjob3.getJobState() == ControlledJob.State.DEPENDENT_FAILED); |
| assertTrue(cjob4.getJobState() == ControlledJob.State.DEPENDENT_FAILED); |
| |
| theControl.stop(); |
| } |
| |
| public void testJobControlWithKillJob() throws Exception { |
| Configuration conf = createJobConf(); |
| cleanupData(conf); |
| Job job1 = MapReduceTestUtil.createKillJob(conf, outdir_1, indir); |
| JobControl theControl = createDependencies(conf, job1); |
| |
| while (cjob1.getJobState() != ControlledJob.State.RUNNING) { |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException e) { |
| break; |
| } |
| } |
| // verify adding dependingJo to RUNNING job fails. |
| assertFalse(cjob1.addDependingJob(cjob2)); |
| |
| // suspend jobcontrol and resume it again |
| theControl.suspend(); |
| assertTrue( |
| theControl.getThreadState() == JobControl.ThreadState.SUSPENDED); |
| theControl.resume(); |
| |
| // kill the first job. |
| cjob1.killJob(); |
| |
| // wait till all the jobs complete |
| waitTillAllFinished(theControl); |
| |
| assertTrue(cjob1.getJobState() == ControlledJob.State.FAILED); |
| assertTrue(cjob2.getJobState() == ControlledJob.State.SUCCESS); |
| assertTrue(cjob3.getJobState() == ControlledJob.State.DEPENDENT_FAILED); |
| assertTrue(cjob4.getJobState() == ControlledJob.State.DEPENDENT_FAILED); |
| |
| theControl.stop(); |
| } |
| |
| public void testJobControl() throws Exception { |
| Configuration conf = createJobConf(); |
| |
| cleanupData(conf); |
| |
| Job job1 = MapReduceTestUtil.createCopyJob(conf, outdir_1, indir); |
| |
| JobControl theControl = createDependencies(conf, job1); |
| |
| // wait till all the jobs complete |
| waitTillAllFinished(theControl); |
| |
| assertEquals("Some jobs failed", 0, theControl.getFailedJobList().size()); |
| |
| theControl.stop(); |
| } |
| } |