blob: 5f20f08b4dd4810256667eaf86fac514cfd54659 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static junit.framework.Assert.assertTrue;
import static junit.framework.Assert.assertFalse;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertNotNull;
import static junit.framework.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.junit.After;
import org.junit.Test;
/**
* Test the refresh feature of QueueManager.
*/
public class TestQueueManagerRefresh {
private static final Log LOG =
LogFactory.getLog(TestQueueManagerRefresh.class);
String queueConfigPath =
System.getProperty("test.build.extraconf", "build/test/extraconf");
File queueConfigFile =
new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME);
/**
* Remove the configuration file after the test's done.
*/
@After
public void tearDown() {
if (queueConfigFile.exists()) {
queueConfigFile.delete();
}
}
/**
* @return a simple hierarchy of JobQueueInfos
*/
static JobQueueInfo[] getSimpleQueueHierarchy() {
int numQs = 3;
JobQueueInfo[] queues = new JobQueueInfo[numQs];
queues[0] =
newJobQueueInfo(new ArrayList<JobQueueInfo>(), null, "q1",
QueueState.UNDEFINED, null);
queues[1] =
newJobQueueInfo(new ArrayList<JobQueueInfo>(), null, "q1:q2",
QueueState.RUNNING, null);
queues[2] =
newJobQueueInfo(new ArrayList<JobQueueInfo>(), null, "q1:q3",
QueueState.RUNNING, null);
queues[0].addChild(queues[1]);
queues[0].addChild(queues[2]);
return queues;
}
/**
* Test to verify that the refresh of queue properties fails if a new queue is
* added.
*
* @throws Exception
*/
@Test
public void testRefreshWithAddedQueues()
throws Exception {
JobQueueInfo[] queues = getSimpleQueueHierarchy();
// write the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
QueueManager qManager = new QueueManager();
JobQueueInfo newQueue =
newJobQueueInfo(new ArrayList<JobQueueInfo>(), null, "q4",
QueueState.UNDEFINED, null);
queues[0].addChild(newQueue);
// Rewrite the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
testRefreshFailureWithChangeOfHierarchy(qManager);
}
/**
* Test to verify that the refresh of queue properties fails if queues are
* removed.
*
* @throws Exception
*/
@Test
public void testRefreshWithRemovedQueues()
throws Exception {
JobQueueInfo[] queues = getSimpleQueueHierarchy();
// write the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
QueueManager qManager = new QueueManager();
// Remove queue[2]
JobQueueInfo q2 = queues[2];
queues[0].removeChild(q2);
// Rewrite the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
testRefreshFailureWithChangeOfHierarchy(qManager);
}
/**
* @param originalQManager
* @throws Exception
*/
private void testRefreshFailureWithChangeOfHierarchy(
QueueManager originalQManager)
throws Exception {
// Make sure that isHierarchySame returns false.
QueueManager modifiedQueueManager = new QueueManager();
assertFalse("Hierarchy changed after refresh!",
originalQManager.getRoot().isHierarchySameAs(
modifiedQueueManager.getRoot()));
// Refresh the QueueManager and make sure it fails.
try {
originalQManager.refreshQueues(null, null);
fail("Queue-refresh should have failed!");
} catch (Exception e) {
// Refresh failed as expected. Check the error message.
assertTrue(
"Exception message should point to a change in queue hierarchy!",
e.getMessage().contains(
QueueManager.MSG_REFRESH_FAILURE_WITH_CHANGE_OF_HIERARCHY));
}
// Make sure that the old configuration is retained.
List<JobQueueInfo> rootQueues =
originalQManager.getRoot().getJobQueueInfo().getChildren();
assertTrue(rootQueues.size() == 1);
}
/**
* Test to verify that the refresh of queue properties fails if scheduler
* fails to reload itself.
*
* @throws Exception
*/
// @Test
public void testRefreshWithSchedulerFailure()
throws Exception {
JobQueueInfo[] queues = getSimpleQueueHierarchy();
// write the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
QueueManager qManager = new QueueManager();
// No change in configuration. Just Refresh the QueueManager and make sure
// it fails.
try {
qManager.refreshQueues(null,
new MyTaskScheduler().new MyFailingQueueRefresher());
fail("Queue-refresh should have failed!");
} catch (Exception e) {
// Refresh failed as expected. Check the error message.
assertTrue(
"Exception message should point to a refresh-failure in scheduler!",
e.getMessage().contains(
QueueManager.MSG_REFRESH_FAILURE_WITH_SCHEDULER_FAILURE));
}
}
/**
* Test to verify that the refresh of scheduler properties passes smoothly.
*
* @throws Exception
*/
@Test
public void testRefreshOfSchedulerProperties()
throws Exception {
JobQueueInfo[] queues = getSimpleQueueHierarchy();
// Set some scheduler properties
for (JobQueueInfo jqi : queues) {
Properties props = new Properties();
props.setProperty("testing.property", "testing.value."
+ jqi.getQueueName());
jqi.setProperties(props);
}
// write the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
QueueManager qManager = new QueueManager();
MyTaskScheduler myScheduler = new MyTaskScheduler();
qManager.refreshQueues(null, myScheduler.new MyQueueRefresher());
// Verify that the scheduler props are set correctly by scheduler-refresh.
Map<String, Properties> schedProps = myScheduler.getSchedulerProperties();
for (JobQueueInfo jqi : queues) {
String expectedVal = "testing.value." + jqi.getQueueName();
Properties qProperties = schedProps.get(jqi.getQueueName());
assertNotNull("Properties should not be null for the SchedulerQueue "
+ jqi.getQueueName(), qProperties);
String observedVal = qProperties.getProperty("testing.property");
assertEquals("Properties for the SchedulerQueue " + jqi.getQueueName()
+ " are not reloaded properly!", expectedVal, observedVal);
}
}
/**
* Test to verify that the scheduling information per queue in the
* {@link QueueManager} is retained across queue-refresh.
*
* @throws Exception
*/
@Test
public void testSchedulingInfoAfterRefresh()
throws Exception {
JobQueueInfo[] queues = getSimpleQueueHierarchy();
// write the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
QueueManager qManager = new QueueManager();
// Set some scheduling information for the queues in the QueueManager.
for (String qName : qManager.getLeafQueueNames()) {
qManager.setSchedulerInfo(qName, new String(
"scheduling-information-for-queue-" + qName));
}
qManager.refreshQueues(null, null);
// Verify that the scheduling information is retained across refresh.
for (String qName : qManager.getLeafQueueNames()) {
assertEquals("scheduling-information-for-queue-" + qName,
qManager.getSchedulerInfo(qName));
}
}
static class MyTaskScheduler extends TaskScheduler {
Map<String, Properties> schedulerPropsMap =
new HashMap<String, Properties>();
Map<String, Properties> getSchedulerProperties() {
return schedulerPropsMap;
}
class MyQueueRefresher extends QueueRefresher {
private void updateSchedulerProps(JobQueueInfo jqi) {
LOG.info("Updating properties for SchedulerQueue "
+ jqi.getQueueName());
LOG.info("Putting " + jqi.getProperties() + " in "
+ jqi.getQueueName());
schedulerPropsMap.put(jqi.getQueueName(), jqi.getProperties());
for (JobQueueInfo child : jqi.getChildren()) {
updateSchedulerProps(child);
}
}
@Override
void refreshQueues(List<JobQueueInfo> newRootQueues) {
LOG.info("Refreshing scheduler's properties");
for (JobQueueInfo jqi : newRootQueues) {
updateSchedulerProps(jqi);
}
}
}
class MyFailingQueueRefresher extends QueueRefresher {
@Override
void refreshQueues(List<JobQueueInfo> newRootQueues)
throws Throwable {
throw new IOException("Scheduler cannot refresh the queues!");
}
}
@Override
public List<Task> assignTasks(TaskTracker taskTracker) {
return null;
}
@Override
public Collection<JobInProgress> getJobs(String queueName) {
return null;
}
}
static JobQueueInfo newJobQueueInfo(List<JobQueueInfo> children,
Properties props, String queueName, QueueState state,
String schedulingInfo) {
JobQueueInfo jqi = new JobQueueInfo();
jqi.setChildren(children);
if (props != null) {
jqi.setProperties(props);
}
jqi.setQueueName(queueName);
jqi.setQueueState(state.getStateName());
jqi.setSchedulingInfo(schedulingInfo);
return jqi;
}
}