blob: 6d92d0de730d6166aae676fae244132a0fd822c7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import java.net.Proxy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Tests job end notification
*
*/
public class TestJobEndNotifier extends JobEndNotifier {
//Test maximum retries is capped by MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS
private void testNumRetries(Configuration conf) {
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "0");
conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "10");
setConf(conf);
Assert.assertTrue("Expected numTries to be 0, but was " + numTries,
numTries == 0 );
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "1");
setConf(conf);
Assert.assertTrue("Expected numTries to be 1, but was " + numTries,
numTries == 1 );
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "20");
setConf(conf);
Assert.assertTrue("Expected numTries to be 11, but was " + numTries,
numTries == 11 ); //11 because number of _retries_ is 10
}
//Test maximum retry interval is capped by
//MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL
private void testWaitInterval(Configuration conf) {
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "5");
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "1");
setConf(conf);
Assert.assertTrue("Expected waitInterval to be 1, but was " + waitInterval,
waitInterval == 1);
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "10");
setConf(conf);
Assert.assertTrue("Expected waitInterval to be 5, but was " + waitInterval,
waitInterval == 5);
//Test negative numbers are set to default
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "-10");
setConf(conf);
Assert.assertTrue("Expected waitInterval to be 5, but was " + waitInterval,
waitInterval == 5);
}
private void testProxyConfiguration(Configuration conf) {
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost");
setConf(conf);
Assert.assertTrue("Proxy shouldn't be set because port wasn't specified",
proxyToUse.type() == Proxy.Type.DIRECT);
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost:someport");
setConf(conf);
Assert.assertTrue("Proxy shouldn't be set because port wasn't numeric",
proxyToUse.type() == Proxy.Type.DIRECT);
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost:1000");
setConf(conf);
Assert.assertTrue("Proxy should have been set but wasn't ",
proxyToUse.toString().equals("HTTP @ somehost:1000"));
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "socks@somehost:1000");
setConf(conf);
Assert.assertTrue("Proxy should have been socks but wasn't ",
proxyToUse.toString().equals("SOCKS @ somehost:1000"));
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "SOCKS@somehost:1000");
setConf(conf);
Assert.assertTrue("Proxy should have been socks but wasn't ",
proxyToUse.toString().equals("SOCKS @ somehost:1000"));
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "sfafn@somehost:1000");
setConf(conf);
Assert.assertTrue("Proxy should have been http but wasn't ",
proxyToUse.toString().equals("HTTP @ somehost:1000"));
}
/**
* Test that setting parameters has the desired effect
*/
@Test
public void checkConfiguration() {
Configuration conf = new Configuration();
testNumRetries(conf);
testWaitInterval(conf);
testProxyConfiguration(conf);
}
protected int notificationCount = 0;
@Override
protected boolean notifyURLOnce() {
boolean success = super.notifyURLOnce();
notificationCount++;
return success;
}
//Check retries happen as intended
@Test
public void testNotifyRetries() throws InterruptedException {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent");
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "3");
conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "3");
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "3000");
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "3000");
JobReport jobReport = Mockito.mock(JobReport.class);
long startTime = System.currentTimeMillis();
this.notificationCount = 0;
this.setConf(conf);
this.notify(jobReport);
long endTime = System.currentTimeMillis();
Assert.assertEquals("Only 3 retries were expected but was : "
+ this.notificationCount, this.notificationCount, 3);
Assert.assertTrue("Should have taken more than 9 seconds it took "
+ (endTime - startTime), endTime - startTime > 9000);
}
}