blob: d5d62f2cae92cb572a746fe96bdcfdfaaef52300 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.fs.s3a.auth.delegation;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.WordCount;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MockJob;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeSessionTestsEnabled;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.deployService;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyInt;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.terminateService;
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.probeForAssumedRoleARN;
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.*;
import static org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopCluster.assertSecurityEnabled;
import static org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopCluster.closeUserFileSystems;
/**
* Submit a job with S3 delegation tokens.
*
* YARN will not collect DTs unless it is running secure, and turning
* security on complicates test setup "significantly".
* Specifically: buts of MR refuse to work on a local FS unless the
* native libraries are loaded and it can use lower level POSIX APIs
* for creating files and directories with specific permissions.
* In production, this is a good thing. In tests, this is not.
*
* To address this, Job to YARN communications are mocked.
* The client-side job submission is as normal, but the implementation
* of org.apache.hadoop.mapreduce.protocol.ClientProtocol is mock.
*
* It's still an ITest though, as it does use S3A as the source and
* dest so as to collect delegation tokens.
*
* It also uses the open street map open bucket, so that there's an extra
* S3 URL in job submission which can be added as a job resource.
* This is needed to verify that job resources have their tokens extracted
* too.
*/
@RunWith(Parameterized.class)
public class ITestDelegatedMRJob extends AbstractDelegationIT {
private static final Logger LOG =
LoggerFactory.getLogger(ITestDelegatedMRJob.class);
/**
* Created in static {@link #setupCluster()} call.
*/
@SuppressWarnings("StaticNonFinalField")
private static MiniKerberizedHadoopCluster cluster;
private final String name;
private final String tokenBinding;
private final Text tokenKind;
/**
* Created in test setup.
*/
private MiniMRYarnCluster yarn;
private Path destPath;
private static final Path EXTRA_JOB_RESOURCE_PATH
= new Path("s3a://osm-pds/planet/planet-latest.orc");
public static final URI jobResource = EXTRA_JOB_RESOURCE_PATH.toUri();
/**
* Test array for parameterized test runs.
* @return a list of parameter tuples.
*/
@Parameterized.Parameters
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{"session", DELEGATION_TOKEN_SESSION_BINDING, SESSION_TOKEN_KIND},
{"full", DELEGATION_TOKEN_FULL_CREDENTIALS_BINDING, FULL_TOKEN_KIND},
{"role", DELEGATION_TOKEN_ROLE_BINDING, ROLE_TOKEN_KIND},
});
}
public ITestDelegatedMRJob(String name, String tokenBinding, Text tokenKind) {
this.name = name;
this.tokenBinding = tokenBinding;
this.tokenKind = tokenKind;
}
/***
* Set up the clusters.
*/
@BeforeClass
public static void setupCluster() throws Exception {
JobConf conf = new JobConf();
assumeSessionTestsEnabled(conf);
disableFilesystemCaching(conf);
cluster = deployService(conf, new MiniKerberizedHadoopCluster());
}
/**
* Tear down the cluster.
*/
@AfterClass
public static void teardownCluster() throws Exception {
cluster = terminateService(cluster);
}
@Override
protected YarnConfiguration createConfiguration() {
Configuration parent = super.createConfiguration();
YarnConfiguration conf = new YarnConfiguration(parent);
cluster.patchConfigWithYARNBindings(conf);
// fail fairly fast
conf.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
100);
conf.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
10_000);
String host = jobResource.getHost();
// and fix to the main endpoint if the caller has moved
conf.set(
String.format("fs.s3a.bucket.%s.endpoint", host), "");
// set up DTs
enableDelegationTokens(conf, tokenBinding);
return conf;
}
@Override
protected YarnConfiguration getConfiguration() {
return (YarnConfiguration) super.getConfiguration();
}
@Override
public void setup() throws Exception {
cluster.loginPrincipal();
super.setup();
Configuration conf = getConfiguration();
if (DELEGATION_TOKEN_ROLE_BINDING.equals(tokenBinding)) {
// get the ARN or skip the test
probeForAssumedRoleARN(getConfiguration());
}
// filesystems are cached across the test so that
// instrumentation fields can be asserted on
UserGroupInformation.setConfiguration(conf);
assertSecurityEnabled();
LOG.info("Starting MiniMRCluster");
yarn = deployService(conf,
new MiniMRYarnCluster("ITestDelegatedMRJob", 1));
}
@Override
public void teardown() throws Exception {
describe("Teardown operations");
S3AFileSystem fs = getFileSystem();
if (fs != null && destPath != null) {
fs.delete(destPath, true);
}
yarn = terminateService(yarn);
super.teardown();
closeUserFileSystems(UserGroupInformation.getCurrentUser());
}
/**
* Get the test timeout in seconds.
* @return the test timeout as set in system properties or the default.
*/
protected int getTestTimeoutSeconds() {
return getTestPropertyInt(new Configuration(),
KEY_TEST_TIMEOUT,
SCALE_TEST_TIMEOUT_SECONDS);
}
@Override
protected int getTestTimeoutMillis() {
return getTestTimeoutSeconds() * 1000;
}
@Test
public void testCommonCrawlLookup() throws Throwable {
FileSystem resourceFS = EXTRA_JOB_RESOURCE_PATH.getFileSystem(
getConfiguration());
FileStatus status = resourceFS.getFileStatus(EXTRA_JOB_RESOURCE_PATH);
LOG.info("Extra job resource is {}", status);
assertTrue("Not encrypted: " + status, status.isEncrypted());
}
@Test
public void testJobSubmissionCollectsTokens() throws Exception {
describe("Mock Job test");
JobConf conf = new JobConf(getConfiguration());
// the input here is the landsat file; which lets
// us differentiate source URI from dest URI
Path input = new Path(DEFAULT_CSVTEST_FILE);
final FileSystem sourceFS = input.getFileSystem(conf);
// output is in the writable test FS.
final S3AFileSystem fs = getFileSystem();
destPath = path(getMethodName());
fs.delete(destPath, true);
fs.mkdirs(destPath);
Path output = new Path(destPath, "output/");
output = output.makeQualified(fs.getUri(), fs.getWorkingDirectory());
MockJob job = new MockJob(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
job.setMaxMapAttempts(1);
job.setMaxReduceAttempts(1);
// and a file for a different store.
// This is to actually stress the terasort code for which
// the yarn ResourceLocalizationService was having problems with
// fetching resources from.
URI partitionUri = new URI(EXTRA_JOB_RESOURCE_PATH.toString() +
"#_partition.lst");
job.addCacheFile(partitionUri);
describe("Executing Mock Job Submission to %s", output);
job.submit();
final JobStatus status = job.getStatus();
assertEquals("not a mock job",
MockJob.NAME, status.getSchedulingInfo());
assertEquals("Job State",
JobStatus.State.RUNNING, status.getState());
final Credentials submittedCredentials =
requireNonNull(job.getSubmittedCredentials(),
"job submitted credentials");
final Collection<Token<? extends TokenIdentifier>> tokens
= submittedCredentials.getAllTokens();
// log all the tokens for debugging failed test runs
LOG.info("Token Count = {}", tokens.size());
for (Token<? extends TokenIdentifier> token : tokens) {
LOG.info("{}", token);
}
// verify the source token exists
lookupToken(submittedCredentials, sourceFS.getUri(), tokenKind);
// look up the destination token
lookupToken(submittedCredentials, fs.getUri(), tokenKind);
lookupToken(submittedCredentials,
EXTRA_JOB_RESOURCE_PATH.getFileSystem(conf).getUri(), tokenKind);
}
}