blob: 26d00bc7d359addd5a0b72f6bdebc5ba9f4e17d3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.S3ClientOptions;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.File;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.util.VersionInfo;
import org.apache.http.HttpStatus;
import org.junit.rules.TemporaryFolder;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.TEST_FS_S3A_NAME;
import static org.junit.Assert.*;
/**
* S3A tests for configuration, especially credentials.
*/
public class ITestS3AConfiguration {
private static final String EXAMPLE_ID = "AKASOMEACCESSKEY";
private static final String EXAMPLE_KEY =
"RGV0cm9pdCBSZ/WQgY2xl/YW5lZCB1cAEXAMPLE";
private static final String AP_ILLEGAL_ACCESS =
"ARN of type accesspoint cannot be passed as a bucket";
private Configuration conf;
private S3AFileSystem fs;
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3AConfiguration.class);
@Rule
public Timeout testTimeout = new Timeout(
S3ATestConstants.S3A_TEST_TIMEOUT
);
@Rule
public final TemporaryFolder tempDir = new TemporaryFolder();
/**
* Test if custom endpoint is picked up.
* <p>
* The test expects {@link S3ATestConstants#CONFIGURATION_TEST_ENDPOINT}
* to be defined in the Configuration
* describing the endpoint of the bucket to which TEST_FS_S3A_NAME points
* (i.e. "s3-eu-west-1.amazonaws.com" if the bucket is located in Ireland).
* Evidently, the bucket has to be hosted in the region denoted by the
* endpoint for the test to succeed.
* <p>
* More info and the list of endpoint identifiers:
* @see <a href="http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region">endpoint list</a>.
*
* @throws Exception
*/
@Test
public void testEndpoint() throws Exception {
conf = new Configuration();
String endpoint = conf.getTrimmed(
S3ATestConstants.CONFIGURATION_TEST_ENDPOINT, "");
if (endpoint.isEmpty()) {
LOG.warn("Custom endpoint test skipped as " +
S3ATestConstants.CONFIGURATION_TEST_ENDPOINT + "config " +
"setting was not detected");
} else {
conf.set(Constants.ENDPOINT, endpoint);
fs = S3ATestUtils.createTestFileSystem(conf);
AmazonS3 s3 = fs.getAmazonS3ClientForTesting("test endpoint");
String endPointRegion = "";
// Differentiate handling of "s3-" and "s3." based endpoint identifiers
String[] endpointParts = StringUtils.split(endpoint, '.');
if (endpointParts.length == 3) {
endPointRegion = endpointParts[0].substring(3);
} else if (endpointParts.length == 4) {
endPointRegion = endpointParts[1];
} else {
fail("Unexpected endpoint");
}
assertEquals("Endpoint config setting and bucket location differ: ",
endPointRegion, s3.getBucketLocation(fs.getUri().getHost()));
}
}
@Test
public void testProxyConnection() throws Exception {
useFailFastConfiguration();
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.setInt(Constants.PROXY_PORT, 1);
String proxy =
conf.get(Constants.PROXY_HOST) + ":" + conf.get(Constants.PROXY_PORT);
expectFSCreateFailure(AWSClientIOException.class,
conf, "when using proxy " + proxy);
}
/**
* Create a configuration designed to fail fast on network problems.
*/
protected void useFailFastConfiguration() {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.setInt(Constants.RETRY_LIMIT, 2);
conf.set(RETRY_INTERVAL, "100ms");
}
/**
* Expect a filesystem to not be created from a configuration
* @return the exception intercepted
* @throws Exception any other exception
*/
private <E extends Throwable> E expectFSCreateFailure(
Class<E> clazz,
Configuration conf,
String text)
throws Exception {
return intercept(clazz,
() -> {
fs = S3ATestUtils.createTestFileSystem(conf);
fs.listFiles(new Path("/"), false);
return "expected failure creating FS " + text + " got " + fs;
});
}
@Test
public void testProxyPortWithoutHost() throws Exception {
useFailFastConfiguration();
conf.unset(Constants.PROXY_HOST);
conf.setInt(Constants.PROXY_PORT, 1);
IllegalArgumentException e = expectFSCreateFailure(
IllegalArgumentException.class,
conf, "Expected a connection error for proxy server");
String msg = e.toString();
if (!msg.contains(Constants.PROXY_HOST) &&
!msg.contains(Constants.PROXY_PORT)) {
throw e;
}
}
@Test
public void testAutomaticProxyPortSelection() throws Exception {
useFailFastConfiguration();
conf.unset(Constants.PROXY_PORT);
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.set(Constants.SECURE_CONNECTIONS, "true");
expectFSCreateFailure(AWSClientIOException.class,
conf, "Expected a connection error for proxy server");
conf.set(Constants.SECURE_CONNECTIONS, "false");
expectFSCreateFailure(AWSClientIOException.class,
conf, "Expected a connection error for proxy server");
}
@Test
public void testUsernameInconsistentWithPassword() throws Exception {
useFailFastConfiguration();
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.setInt(Constants.PROXY_PORT, 1);
conf.set(Constants.PROXY_USERNAME, "user");
IllegalArgumentException e = expectFSCreateFailure(
IllegalArgumentException.class,
conf, "Expected a connection error for proxy server");
assertIsProxyUsernameError(e);
}
private void assertIsProxyUsernameError(final IllegalArgumentException e) {
String msg = e.toString();
if (!msg.contains(Constants.PROXY_USERNAME) &&
!msg.contains(Constants.PROXY_PASSWORD)) {
throw e;
}
}
@Test
public void testUsernameInconsistentWithPassword2() throws Exception {
useFailFastConfiguration();
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.setInt(Constants.PROXY_PORT, 1);
conf.set(Constants.PROXY_PASSWORD, "password");
IllegalArgumentException e = expectFSCreateFailure(
IllegalArgumentException.class,
conf, "Expected a connection error for proxy server");
assertIsProxyUsernameError(e);
}
@Test
public void testCredsFromCredentialProvider() throws Exception {
// set up conf to have a cred provider
final Configuration conf = new Configuration();
final File file = tempDir.newFile("test.jks");
final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
file.toURI());
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
jks.toString());
provisionAccessKeys(conf);
conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM");
S3xLoginHelper.Login creds =
S3AUtils.getAWSAccessKeys(new URI("s3a://foobar"), conf);
assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getUser());
assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getPassword());
}
void provisionAccessKeys(final Configuration conf) throws Exception {
// add our creds to the provider
final CredentialProvider provider =
CredentialProviderFactory.getProviders(conf).get(0);
provider.createCredentialEntry(Constants.ACCESS_KEY,
EXAMPLE_ID.toCharArray());
provider.createCredentialEntry(Constants.SECRET_KEY,
EXAMPLE_KEY.toCharArray());
provider.flush();
}
@Test
public void testSecretFromCredentialProviderIDFromConfig() throws Exception {
// set up conf to have a cred provider
final Configuration conf = new Configuration();
final File file = tempDir.newFile("test.jks");
final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
file.toURI());
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
jks.toString());
// add our creds to the provider
final CredentialProvider provider =
CredentialProviderFactory.getProviders(conf).get(0);
provider.createCredentialEntry(Constants.SECRET_KEY,
EXAMPLE_KEY.toCharArray());
provider.flush();
conf.set(Constants.ACCESS_KEY, EXAMPLE_ID);
S3xLoginHelper.Login creds =
S3AUtils.getAWSAccessKeys(new URI("s3a://foobar"), conf);
assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getUser());
assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getPassword());
}
@Test
public void testIDFromCredentialProviderSecretFromConfig() throws Exception {
// set up conf to have a cred provider
final Configuration conf = new Configuration();
final File file = tempDir.newFile("test.jks");
final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
file.toURI());
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
jks.toString());
// add our creds to the provider
final CredentialProvider provider =
CredentialProviderFactory.getProviders(conf).get(0);
provider.createCredentialEntry(Constants.ACCESS_KEY,
EXAMPLE_ID.toCharArray());
provider.flush();
conf.set(Constants.SECRET_KEY, EXAMPLE_KEY);
S3xLoginHelper.Login creds =
S3AUtils.getAWSAccessKeys(new URI("s3a://foobar"), conf);
assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getUser());
assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getPassword());
}
@Test
public void testExcludingS3ACredentialProvider() throws Exception {
// set up conf to have a cred provider
final Configuration conf = new Configuration();
final File file = tempDir.newFile("test.jks");
final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
file.toURI());
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
"jceks://s3a/foobar," + jks.toString());
// first make sure that the s3a based provider is removed
Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
conf, S3AFileSystem.class);
String newPath = conf.get(
CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH);
assertFalse("Provider Path incorrect", newPath.contains("s3a://"));
// now let's make sure the new path is created by the S3AFileSystem
// and the integration still works. Let's provision the keys through
// the altered configuration instance and then try and access them
// using the original config with the s3a provider in the path.
provisionAccessKeys(c);
conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM");
URI uri2 = new URI("s3a://foobar");
S3xLoginHelper.Login creds =
S3AUtils.getAWSAccessKeys(uri2, conf);
assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getUser());
assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getPassword());
}
@Test
public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty()
throws Exception {
conf = new Configuration();
conf.set(Constants.PATH_STYLE_ACCESS, Boolean.toString(true));
assertTrue(conf.getBoolean(Constants.PATH_STYLE_ACCESS, false));
try {
fs = S3ATestUtils.createTestFileSystem(conf);
assertNotNull(fs);
AmazonS3 s3 = fs.getAmazonS3ClientForTesting("configuration");
assertNotNull(s3);
S3ClientOptions clientOptions = getField(s3, S3ClientOptions.class,
"clientOptions");
assertTrue("Expected to find path style access to be switched on!",
clientOptions.isPathStyleAccess());
byte[] file = ContractTestUtils.toAsciiByteArray("test file");
ContractTestUtils.writeAndRead(fs,
new Path("/path/style/access/testFile"), file, file.length,
(int) conf.getLongBytes(Constants.FS_S3A_BLOCK_SIZE, file.length), false, true);
} catch (final AWSS3IOException e) {
LOG.error("Caught exception: ", e);
// Catch/pass standard path style access behaviour when live bucket
// isn't in the same region as the s3 client default. See
// http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html
assertEquals(HttpStatus.SC_MOVED_PERMANENTLY, e.getStatusCode());
} catch (final IllegalArgumentException e) {
// Path style addressing does not work with AP ARNs
if (!fs.getBucket().contains("arn:")) {
LOG.error("Caught unexpected exception: ", e);
throw e;
}
GenericTestUtils.assertExceptionContains(AP_ILLEGAL_ACCESS, e);
}
}
@Test
public void testDefaultUserAgent() throws Exception {
conf = new Configuration();
fs = S3ATestUtils.createTestFileSystem(conf);
assertNotNull(fs);
AmazonS3 s3 = fs.getAmazonS3ClientForTesting("User Agent");
assertNotNull(s3);
ClientConfiguration awsConf = getField(s3, ClientConfiguration.class,
"clientConfiguration");
assertEquals("Hadoop " + VersionInfo.getVersion(),
awsConf.getUserAgentPrefix());
}
@Test
public void testCustomUserAgent() throws Exception {
conf = new Configuration();
conf.set(Constants.USER_AGENT_PREFIX, "MyApp");
fs = S3ATestUtils.createTestFileSystem(conf);
assertNotNull(fs);
AmazonS3 s3 = fs.getAmazonS3ClientForTesting("User agent");
assertNotNull(s3);
ClientConfiguration awsConf = getField(s3, ClientConfiguration.class,
"clientConfiguration");
assertEquals("MyApp, Hadoop " + VersionInfo.getVersion(),
awsConf.getUserAgentPrefix());
}
@Test
public void testRequestTimeout() throws Exception {
conf = new Configuration();
conf.set(REQUEST_TIMEOUT, "120");
fs = S3ATestUtils.createTestFileSystem(conf);
AmazonS3 s3 = fs.getAmazonS3ClientForTesting("Request timeout (ms)");
ClientConfiguration awsConf = getField(s3, ClientConfiguration.class,
"clientConfiguration");
assertEquals("Configured " + REQUEST_TIMEOUT +
" is different than what AWS sdk configuration uses internally",
120000, awsConf.getRequestTimeout());
}
@Test
@SuppressWarnings("deprecation")
public void testCloseIdempotent() throws Throwable {
conf = new Configuration();
fs = S3ATestUtils.createTestFileSystem(conf);
AWSCredentialProviderList credentials =
fs.shareCredentials("testCloseIdempotent");
credentials.close();
fs.close();
assertTrue("Closing FS didn't close credentials " + credentials,
credentials.isClosed());
assertEquals("refcount not zero in " + credentials, 0, credentials.getRefCount());
fs.close();
// and the numbers should not change
assertEquals("refcount not zero in " + credentials, 0, credentials.getRefCount());
}
@Test
public void testDirectoryAllocatorDefval() throws Throwable {
conf = new Configuration();
conf.unset(Constants.BUFFER_DIR);
fs = S3ATestUtils.createTestFileSystem(conf);
File tmp = fs.createTmpFileForWrite("out-", 1024, conf);
assertTrue("not found: " + tmp, tmp.exists());
tmp.delete();
}
@Test
public void testDirectoryAllocatorRR() throws Throwable {
File dir1 = GenericTestUtils.getRandomizedTestDir();
File dir2 = GenericTestUtils.getRandomizedTestDir();
dir1.mkdirs();
dir2.mkdirs();
conf = new Configuration();
conf.set(Constants.BUFFER_DIR, dir1 + ", " + dir2);
fs = S3ATestUtils.createTestFileSystem(conf);
File tmp1 = fs.createTmpFileForWrite("out-", 1024, conf);
tmp1.delete();
File tmp2 = fs.createTmpFileForWrite("out-", 1024, conf);
tmp2.delete();
assertNotEquals("round robin not working",
tmp1.getParent(), tmp2.getParent());
}
@Test
public void testUsernameFromUGI() throws Throwable {
final String alice = "alice";
UserGroupInformation fakeUser =
UserGroupInformation.createUserForTesting(alice,
new String[]{"users", "administrators"});
conf = new Configuration();
fs = fakeUser.doAs(new PrivilegedExceptionAction<S3AFileSystem>() {
@Override
public S3AFileSystem run() throws Exception{
return S3ATestUtils.createTestFileSystem(conf);
}
});
assertEquals("username", alice, fs.getUsername());
FileStatus status = fs.getFileStatus(new Path("/"));
assertEquals("owner in " + status, alice, status.getOwner());
assertEquals("group in " + status, alice, status.getGroup());
}
/**
* Reads and returns a field from an object using reflection. If the field
* cannot be found, is null, or is not the expected type, then this method
* fails the test.
*
* @param target object to read
* @param fieldType type of field to read, which will also be the return type
* @param fieldName name of field to read
* @return field that was read
* @throws IllegalAccessException if access not allowed
*/
private static <T> T getField(Object target, Class<T> fieldType,
String fieldName) throws IllegalAccessException {
Object obj = FieldUtils.readField(target, fieldName, true);
assertNotNull(String.format(
"Could not read field named %s in object with class %s.", fieldName,
target.getClass().getName()), obj);
assertTrue(String.format(
"Unexpected type found for field named %s, expected %s, actual %s.",
fieldName, fieldType.getName(), obj.getClass().getName()),
fieldType.isAssignableFrom(obj.getClass()));
return fieldType.cast(obj);
}
@Test
public void testConfOptionPropagationToFS() throws Exception {
Configuration config = new Configuration();
String testFSName = config.getTrimmed(TEST_FS_S3A_NAME, "");
String bucket = new URI(testFSName).getHost();
setBucketOption(config, bucket, "propagation", "propagated");
fs = S3ATestUtils.createTestFileSystem(config);
Configuration updated = fs.getConf();
assertOptionEquals(updated, "fs.s3a.propagation", "propagated");
}
@Test(timeout = 10_000L)
public void testS3SpecificSignerOverride() throws IOException {
ClientConfiguration clientConfiguration = null;
Configuration config;
String signerOverride = "testSigner";
String s3SignerOverride = "testS3Signer";
// Default SIGNING_ALGORITHM, overridden for S3 only
config = new Configuration();
config.set(SIGNING_ALGORITHM_S3, s3SignerOverride);
clientConfiguration = S3AUtils
.createAwsConf(config, "dontcare", AWS_SERVICE_IDENTIFIER_S3);
Assert.assertEquals(s3SignerOverride,
clientConfiguration.getSignerOverride());
clientConfiguration = S3AUtils
.createAwsConf(config, "dontcare", AWS_SERVICE_IDENTIFIER_STS);
Assert.assertNull(clientConfiguration.getSignerOverride());
// Configured base SIGNING_ALGORITHM, overridden for S3 only
config = new Configuration();
config.set(SIGNING_ALGORITHM, signerOverride);
config.set(SIGNING_ALGORITHM_S3, s3SignerOverride);
clientConfiguration = S3AUtils
.createAwsConf(config, "dontcare", AWS_SERVICE_IDENTIFIER_S3);
Assert.assertEquals(s3SignerOverride,
clientConfiguration.getSignerOverride());
clientConfiguration = S3AUtils
.createAwsConf(config, "dontcare", AWS_SERVICE_IDENTIFIER_STS);
Assert
.assertEquals(signerOverride, clientConfiguration.getSignerOverride());
}
}