blob: 98563d56a01850eadb2adc3423ff33af7a56da68 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
/**
* Test job submission. This test checks if
* - basic : job submission via jobclient
* - cleanup : job client crashes while submitting
* - invalid job config
* - invalid memory config
*
*/
public class TestSubmitJob {
static final Log LOG = LogFactory.getLog(TestSubmitJob.class);
private static Path TEST_DIR =
new Path(System.getProperty("test.build.data","/tmp"),
"job-submission-testing");
/**
* Test to verify that jobs with invalid memory requirements are killed at the
* JT.
*
* @throws Exception
*/
@SuppressWarnings("deprecation")
@Test
public void testJobWithInvalidMemoryReqs() throws Exception {
MiniMRCluster mrCluster = null;
try {
JobConf jtConf = new JobConf();
jtConf.setLong(MRConfig.MAPMEMORY_MB, 1 * 1024L);
jtConf.setLong(MRConfig.REDUCEMEMORY_MB, 2 * 1024L);
jtConf.setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 3 * 1024L);
jtConf.setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 4 * 1024L);
mrCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);
JobConf clusterConf = mrCluster.createJobConf();
// No map-memory configuration
JobConf jobConf = new JobConf(clusterConf);
jobConf.setMemoryForReduceTask(1 * 1024L);
runJobAndVerifyFailure(jobConf, JobConf.DISABLED_MEMORY_LIMIT, 1 * 1024L,
"Invalid job requirements.");
// No reduce-memory configuration
jobConf = new JobConf(clusterConf);
jobConf.setMemoryForMapTask(1 * 1024L);
runJobAndVerifyFailure(jobConf, 1 * 1024L, JobConf.DISABLED_MEMORY_LIMIT,
"Invalid job requirements.");
// Invalid map-memory configuration
jobConf = new JobConf(clusterConf);
jobConf.setMemoryForMapTask(4 * 1024L);
jobConf.setMemoryForReduceTask(1 * 1024L);
runJobAndVerifyFailure(jobConf, 4 * 1024L, 1 * 1024L,
"Exceeds the cluster's max-memory-limit.");
// No reduce-memory configuration
jobConf = new JobConf(clusterConf);
jobConf.setMemoryForMapTask(1 * 1024L);
jobConf.setMemoryForReduceTask(5 * 1024L);
runJobAndVerifyFailure(jobConf, 1 * 1024L, 5 * 1024L,
"Exceeds the cluster's max-memory-limit.");
} finally {
if (mrCluster != null) {
mrCluster.shutdown();
}
}
}
@SuppressWarnings("deprecation")
private void runJobAndVerifyFailure(JobConf jobConf, long memForMapTasks,
long memForReduceTasks, String expectedMsg)
throws Exception,
IOException {
String[] args = { "-m", "0", "-r", "0", "-mt", "0", "-rt", "0" };
boolean throwsException = false;
String msg = null;
try {
ToolRunner.run(jobConf, new SleepJob(), args);
} catch (RemoteException re) {
throwsException = true;
msg = re.unwrapRemoteException().getMessage();
}
assertTrue(throwsException);
assertNotNull(msg);
String overallExpectedMsg =
"(" + memForMapTasks + " memForMapTasks " + memForReduceTasks
+ " memForReduceTasks): " + expectedMsg;
assertTrue("Observed message - " + msg
+ " - doesn't contain expected message - " + overallExpectedMsg, msg
.contains(overallExpectedMsg));
}
@SuppressWarnings("deprecation")
static ClientProtocol getJobSubmitClient(JobConf conf,
UserGroupInformation ugi)
throws IOException {
return (ClientProtocol) RPC.getProxy(ClientProtocol.class,
ClientProtocol.versionID, JobTracker.getAddress(conf), ugi,
conf, NetUtils.getSocketFactory(conf, ClientProtocol.class));
}
static org.apache.hadoop.hdfs.protocol.ClientProtocol getDFSClient(
Configuration conf, UserGroupInformation ugi) throws IOException {
return new ClientNamenodeProtocolTranslatorPB(NameNode.getAddress(conf),
conf, ugi);
}
/**
* Submit a job and check if the files are accessible to other users.
*/
@SuppressWarnings("deprecation")
@Test
public void testSecureJobExecution() throws Exception {
LOG.info("Testing secure job submission/execution");
MiniMRCluster mr = null;
Configuration conf = new Configuration();
final MiniDFSCluster dfs = new MiniDFSCluster(conf, 1, true, null);
try {
FileSystem fs =
TestMiniMRWithDFSWithDistinctUsers.DFS_UGI.doAs(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws IOException {
return dfs.getFileSystem();
}
});
TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/user", "mapred", "mapred", (short)01777);
TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/mapred", "mapred", "mapred", (short)01777);
TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, conf.get(JTConfig.JT_STAGING_AREA_ROOT),
"mapred", "mapred", (short)01777);
UserGroupInformation MR_UGI = UserGroupInformation.getLoginUser();
mr = new MiniMRCluster(0, 0, 1, dfs.getFileSystem().getUri().toString(),
1, null, null, MR_UGI);
JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
// cleanup
dfs.getFileSystem().delete(TEST_DIR, true);
final Path mapSignalFile = new Path(TEST_DIR, "map-signal");
final Path reduceSignalFile = new Path(TEST_DIR, "reduce-signal");
// create a ugi for user 1
UserGroupInformation user1 =
TestMiniMRWithDFSWithDistinctUsers.createUGI("user1", false);
Path inDir = new Path("/user/input");
Path outDir = new Path("/user/output");
final JobConf job = mr.createJobConf();
UtilsForTests.configureWaitingJobConf(job, inDir, outDir, 2, 0,
"test-submit-job", mapSignalFile.toString(),
reduceSignalFile.toString());
job.set(UtilsForTests.getTaskSignalParameter(true),
mapSignalFile.toString());
job.set(UtilsForTests.getTaskSignalParameter(false),
reduceSignalFile.toString());
LOG.info("Submit job as the actual user (" + user1.getUserName() + ")");
final JobClient jClient =
user1.doAs(new PrivilegedExceptionAction<JobClient>() {
public JobClient run() throws IOException {
return new JobClient(job);
}
});
RunningJob rJob = user1.doAs(new PrivilegedExceptionAction<RunningJob>() {
public RunningJob run() throws IOException {
return jClient.submitJob(job);
}
});
JobID id = rJob.getID();
LOG.info("Running job " + id);
// create user2
UserGroupInformation user2 =
TestMiniMRWithDFSWithDistinctUsers.createUGI("user2", false);
JobConf conf_other = mr.createJobConf();
org.apache.hadoop.hdfs.protocol.ClientProtocol client =
getDFSClient(conf_other, user2);
// try accessing mapred.system.dir/jobid/*
try {
String path = new URI(jt.getSystemDir()).getPath();
LOG.info("Try listing the mapred-system-dir as the user ("
+ user2.getUserName() + ")");
client.getListing(path, HdfsFileStatus.EMPTY_NAME, false);
fail("JobTracker system dir is accessible to others");
} catch (IOException ioe) {
assertTrue(ioe.toString(),
ioe.toString().contains("Permission denied"));
}
// try accessing ~/.staging/jobid/*
JobInProgress jip = jt.getJob(id);
Path jobSubmitDirpath =
new Path(jip.getJobConf().get("mapreduce.job.dir"));
try {
LOG.info("Try accessing the job folder for job " + id + " as the user ("
+ user2.getUserName() + ")");
client.getListing(jobSubmitDirpath.toUri().getPath(),
HdfsFileStatus.EMPTY_NAME, false);
fail("User's staging folder is accessible to others");
} catch (IOException ioe) {
assertTrue(ioe.toString(),
ioe.toString().contains("Permission denied"));
}
UtilsForTests.signalTasks(dfs, fs, true, mapSignalFile.toString(),
reduceSignalFile.toString());
// wait for job to be done
UtilsForTests.waitTillDone(jClient);
// check if the staging area is cleaned up
LOG.info("Check if job submit dir is cleanup or not");
assertFalse(fs.exists(jobSubmitDirpath));
} finally {
if (mr != null) {
mr.shutdown();
}
if (dfs != null) {
dfs.shutdown();
}
}
}
}