blob: 985eea3f652060f9f53b83d31b2f6c194593e948 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.tools;
import static org.junit.Assert.*;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.junit.Assert;
import org.junit.Test;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doReturn;
public class TestCLI {
private static String jobIdStr = "job_1015298225799_0015";
@Test
public void testListAttemptIdsWithValidInput() throws Exception {
JobID jobId = JobID.forName(jobIdStr);
Cluster mockCluster = mock(Cluster.class);
Job job = mock(Job.class);
CLI cli = spy(new CLI(new Configuration()));
doReturn(mockCluster).when(cli).createCluster();
when(job.getTaskReports(TaskType.MAP)).thenReturn(
getTaskReports(jobId, TaskType.MAP));
when(job.getTaskReports(TaskType.REDUCE)).thenReturn(
getTaskReports(jobId, TaskType.REDUCE));
when(mockCluster.getJob(jobId)).thenReturn(job);
int retCode_MAP = cli.run(new String[] { "-list-attempt-ids", jobIdStr,
"MAP", "running" });
// testing case insensitive behavior
int retCode_map = cli.run(new String[] { "-list-attempt-ids", jobIdStr,
"map", "running" });
int retCode_REDUCE = cli.run(new String[] { "-list-attempt-ids", jobIdStr,
"REDUCE", "running" });
int retCode_completed = cli.run(new String[] { "-list-attempt-ids",
jobIdStr, "REDUCE", "completed" });
assertEquals("MAP is a valid input,exit code should be 0", 0, retCode_MAP);
assertEquals("map is a valid input,exit code should be 0", 0, retCode_map);
assertEquals("REDUCE is a valid input,exit code should be 0", 0,
retCode_REDUCE);
assertEquals(
"REDUCE and completed are a valid inputs to -list-attempt-ids,exit code should be 0",
0, retCode_completed);
verify(job, times(2)).getTaskReports(TaskType.MAP);
verify(job, times(2)).getTaskReports(TaskType.REDUCE);
}
@Test
public void testListAttemptIdsWithInvalidInputs() throws Exception {
JobID jobId = JobID.forName(jobIdStr);
Cluster mockCluster = mock(Cluster.class);
Job job = mock(Job.class);
CLI cli = spy(new CLI(new Configuration()));
doReturn(mockCluster).when(cli).createCluster();
when(mockCluster.getJob(jobId)).thenReturn(job);
int retCode_JOB_SETUP = cli.run(new String[] { "-list-attempt-ids",
jobIdStr, "JOB_SETUP", "running" });
int retCode_JOB_CLEANUP = cli.run(new String[] { "-list-attempt-ids",
jobIdStr, "JOB_CLEANUP", "running" });
int retCode_invalidTaskState = cli.run(new String[] { "-list-attempt-ids",
jobIdStr, "REDUCE", "complete" });
String jobIdStr2 = "job_1015298225799_0016";
int retCode_invalidJobId = cli.run(new String[] { "-list-attempt-ids",
jobIdStr2, "MAP", "running" });
assertEquals("JOB_SETUP is an invalid input,exit code should be -1", -1,
retCode_JOB_SETUP);
assertEquals("JOB_CLEANUP is an invalid input,exit code should be -1", -1,
retCode_JOB_CLEANUP);
assertEquals("complete is an invalid input,exit code should be -1", -1,
retCode_invalidTaskState);
assertEquals("Non existing job id should be skippted with -1", -1,
retCode_invalidJobId);
}
private TaskReport[] getTaskReports(JobID jobId, TaskType type) {
return new TaskReport[] { new TaskReport(), new TaskReport() };
}
@Test
public void testJobKIll() throws Exception {
Cluster mockCluster = mock(Cluster.class);
CLI cli = spy(new CLI(new Configuration()));
doReturn(mockCluster).when(cli).createCluster();
String jobId1 = "job_1234654654_001";
String jobId2 = "job_1234654654_002";
String jobId3 = "job_1234654654_003";
String jobId4 = "job_1234654654_004";
Job mockJob1 = mockJob(mockCluster, jobId1, State.RUNNING);
Job mockJob2 = mockJob(mockCluster, jobId2, State.KILLED);
Job mockJob3 = mockJob(mockCluster, jobId3, State.FAILED);
Job mockJob4 = mockJob(mockCluster, jobId4, State.PREP);
int exitCode1 = cli.run(new String[] { "-kill", jobId1 });
assertEquals(0, exitCode1);
verify(mockJob1, times(1)).killJob();
int exitCode2 = cli.run(new String[] { "-kill", jobId2 });
assertEquals(-1, exitCode2);
verify(mockJob2, times(0)).killJob();
int exitCode3 = cli.run(new String[] { "-kill", jobId3 });
assertEquals(-1, exitCode3);
verify(mockJob3, times(0)).killJob();
int exitCode4 = cli.run(new String[] { "-kill", jobId4 });
assertEquals(0, exitCode4);
verify(mockJob4, times(1)).killJob();
}
private Job mockJob(Cluster mockCluster, String jobId, State jobState)
throws IOException, InterruptedException {
Job mockJob = mock(Job.class);
when(mockCluster.getJob(JobID.forName(jobId))).thenReturn(mockJob);
JobStatus status = new JobStatus(null, 0, 0, 0, 0, jobState,
JobPriority.HIGH, null, null, null, null);
when(mockJob.getStatus()).thenReturn(status);
return mockJob;
}
@Test
public void testGetJobWithoutRetry() throws Exception {
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, 0);
final Cluster mockCluster = mock(Cluster.class);
when(mockCluster.getJob(any(JobID.class))).thenReturn(null);
CLI cli = new CLI(conf);
cli.cluster = mockCluster;
Job job = cli.getJob(JobID.forName("job_1234654654_001"));
Assert.assertTrue("job is not null", job == null);
}
@Test
public void testGetJobWithRetry() throws Exception {
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, 1);
final Cluster mockCluster = mock(Cluster.class);
final Job mockJob = Job.getInstance(conf);
when(mockCluster.getJob(any(JobID.class))).thenReturn(
null).thenReturn(mockJob);
CLI cli = new CLI(conf);
cli.cluster = mockCluster;
Job job = cli.getJob(JobID.forName("job_1234654654_001"));
Assert.assertTrue("job is null", job != null);
}
@Test
public void testListEvents() throws Exception {
Cluster mockCluster = mock(Cluster.class);
CLI cli = spy(new CLI(new Configuration()));
doReturn(mockCluster).when(cli).createCluster();
String jobId1 = "job_1234654654_001";
String jobId2 = "job_1234654656_002";
Job mockJob1 = mockJob(mockCluster, jobId1, State.RUNNING);
// Check exiting with non existing job
int exitCode = cli.run(new String[]{"-events", jobId2, "0", "10"});
assertEquals(-1, exitCode);
}
@Test
public void testLogs() throws Exception {
Cluster mockCluster = mock(Cluster.class);
CLI cli = spy(new CLI(new Configuration()));
doReturn(mockCluster).when(cli).createCluster();
String jobId1 = "job_1234654654_001";
String jobId2 = "job_1234654656_002";
Job mockJob1 = mockJob(mockCluster, jobId1, State.SUCCEEDED);
// Check exiting with non existing job
int exitCode = cli.run(new String[]{"-logs", jobId2});
assertEquals(-1, exitCode);
}
}