blob: 3dd0b7eb99ea1de4046be001865db6c81cb0c29c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpConfig.Policy;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
public class TestSaslDataTransfer extends SaslDataTransferTestCase {
private static final int BLOCK_SIZE = 4096;
private static final int NUM_BLOCKS = 3;
private static final Path PATH = new Path("/file1");
private MiniDFSCluster cluster;
private FileSystem fs;
@Rule
public ExpectedException exception = ExpectedException.none();
@Rule
public Timeout timeout = new Timeout(60000);
@After
public void shutdown() {
IOUtils.cleanupWithLogger(null, fs);
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Test
public void testAuthentication() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig(
"authentication,integrity,privacy");
startCluster(clusterConf);
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
doTest(clientConf);
}
@Test
public void testIntegrity() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig(
"authentication,integrity,privacy");
startCluster(clusterConf);
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "integrity");
doTest(clientConf);
}
@Test
public void testPrivacy() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig(
"authentication,integrity,privacy");
startCluster(clusterConf);
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "privacy");
doTest(clientConf);
}
@Test
public void testClientAndServerDoNotHaveCommonQop() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig("privacy");
startCluster(clusterConf);
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
exception.expect(IOException.class);
exception.expectMessage("could only be written to 0");
doTest(clientConf);
}
@Test
public void testServerSaslNoClientSasl() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig(
"authentication,integrity,privacy");
// Set short retry timeouts so this test runs faster
clusterConf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
startCluster(clusterConf);
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "");
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
LoggerFactory.getLogger(DataNode.class));
try {
doTest(clientConf);
Assert.fail("Should fail if SASL data transfer protection is not " +
"configured or not supported in client");
} catch (IOException e) {
GenericTestUtils.assertMatches(e.getMessage(),
"could only be written to 0");
} finally {
logs.stopCapturing();
}
GenericTestUtils.assertMatches(logs.getOutput(),
"Failed to read expected SASL data transfer protection " +
"handshake from client at");
}
@Test
public void testDataNodeAbortsIfNoSasl() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig("");
exception.expect(RuntimeException.class);
exception.expectMessage("Cannot start secure DataNode");
startCluster(clusterConf);
}
@Test
public void testDataNodeAbortsIfNotHttpsOnly() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig("authentication");
clusterConf.set(DFS_HTTP_POLICY_KEY,
HttpConfig.Policy.HTTP_AND_HTTPS.name());
exception.expect(RuntimeException.class);
exception.expectMessage("Cannot start secure DataNode");
startCluster(clusterConf);
}
@Test
public void testDataNodeStartIfHttpsQopPrivacy() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig("privacy");
clusterConf.set(DFS_HTTP_POLICY_KEY,
Policy.HTTPS_ONLY.name());
startCluster(clusterConf);
}
@Test
public void testNoSaslAndSecurePortsIgnored() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig("");
clusterConf.setBoolean(IGNORE_SECURE_PORTS_FOR_TESTING_KEY, true);
startCluster(clusterConf);
doTest(clusterConf);
}
/**
* Tests DataTransferProtocol with the given client configuration.
*
* @param conf client configuration
* @throws IOException if there is an I/O error
*/
private void doTest(HdfsConfiguration conf) throws IOException {
fs = FileSystem.get(cluster.getURI(), conf);
FileSystemTestHelper.createFile(fs, PATH, NUM_BLOCKS, BLOCK_SIZE);
assertArrayEquals(FileSystemTestHelper.getFileData(NUM_BLOCKS, BLOCK_SIZE),
DFSTestUtil.readFile(fs, PATH).getBytes("UTF-8"));
BlockLocation[] blockLocations = fs.getFileBlockLocations(PATH, 0,
Long.MAX_VALUE);
assertNotNull(blockLocations);
assertEquals(NUM_BLOCKS, blockLocations.length);
for (BlockLocation blockLocation: blockLocations) {
assertNotNull(blockLocation.getHosts());
assertEquals(3, blockLocation.getHosts().length);
}
}
/**
* Starts a cluster with the given configuration.
*
* @param conf cluster configuration
* @throws IOException if there is an I/O error
*/
private void startCluster(HdfsConfiguration conf) throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
}
/**
* Verifies that peerFromSocketAndKey honors socket read timeouts.
*/
@Test(timeout=60000)
public void TestPeerFromSocketAndKeyReadTimeout() throws Exception {
HdfsConfiguration conf = createSecureConfig(
"authentication,integrity,privacy");
AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
SaslDataTransferClient saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), fallbackToSimpleAuth);
DatanodeID fakeDatanodeId = new DatanodeID("127.0.0.1", "localhost",
"beefbeef-beef-beef-beef-beefbeefbeef", 1, 2, 3, 4);
DataEncryptionKeyFactory dataEncKeyFactory =
new DataEncryptionKeyFactory() {
@Override
public DataEncryptionKey newDataEncryptionKey() {
return new DataEncryptionKey(123, "456", new byte[8],
new byte[8], 1234567, "fakeAlgorithm");
}
};
ServerSocket serverSocket = null;
Socket socket = null;
try {
serverSocket = new ServerSocket(0, -1);
socket = new Socket(serverSocket.getInetAddress(),
serverSocket.getLocalPort());
Peer peer = DFSUtilClient.peerFromSocketAndKey(saslClient, socket,
dataEncKeyFactory, new Token(), fakeDatanodeId, 1);
peer.close();
Assert.fail("Expected DFSClient#peerFromSocketAndKey to time out.");
} catch (SocketTimeoutException e) {
GenericTestUtils.assertExceptionContains("Read timed out", e);
} finally {
IOUtils.cleanupWithLogger(null, socket, serverSocket);
}
}
/**
* Verifies that SaslDataTransferClient#checkTrustAndSend should not trust a
* partially trusted channel.
*/
@Test
public void testSaslDataTransferWithTrustedServerUntrustedClient() throws
Exception {
HdfsConfiguration conf = createSecureConfig(
"authentication,integrity,privacy");
AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
TrustedChannelResolver trustedChannelResolver = new
TrustedChannelResolver() {
@Override
public boolean isTrusted() {
return true;
}
@Override
public boolean isTrusted(InetAddress peerAddress) {
return false;
}
};
SaslDataTransferClient saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
trustedChannelResolver, fallbackToSimpleAuth);
ServerSocket serverSocket = null;
Socket socket = null;
DataEncryptionKeyFactory dataEncryptionKeyFactory = null;
try {
serverSocket = new ServerSocket(10002, 10);
socket = new Socket(serverSocket.getInetAddress(),
serverSocket.getLocalPort());
dataEncryptionKeyFactory = mock(DataEncryptionKeyFactory.class);
Mockito.when(dataEncryptionKeyFactory.newDataEncryptionKey())
.thenThrow(new IOException("Encryption enabled"));
saslClient.socketSend(socket, null, null, dataEncryptionKeyFactory,
null, null);
Assert.fail("Expected IOException from "
+ "SaslDataTransferClient#checkTrustAndSend");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Encryption enabled", e);
verify(dataEncryptionKeyFactory, times(1)).newDataEncryptionKey();
} finally {
IOUtils.cleanupWithLogger(null, socket, serverSocket);
}
}
@Test
public void testSaslDataTransferWithUntrustedServerUntrustedClient() throws
Exception {
HdfsConfiguration conf = createSecureConfig(
"authentication,integrity,privacy");
AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
TrustedChannelResolver trustedChannelResolver = new
TrustedChannelResolver() {
@Override
public boolean isTrusted() {
return false;
}
@Override
public boolean isTrusted(InetAddress peerAddress) {
return false;
}
};
SaslDataTransferClient saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
trustedChannelResolver, fallbackToSimpleAuth);
ServerSocket serverSocket = null;
Socket socket = null;
DataEncryptionKeyFactory dataEncryptionKeyFactory = null;
try {
serverSocket = new ServerSocket(10002, 10);
socket = new Socket(serverSocket.getInetAddress(),
serverSocket.getLocalPort());
dataEncryptionKeyFactory = mock(DataEncryptionKeyFactory.class);
Mockito.when(dataEncryptionKeyFactory.newDataEncryptionKey())
.thenThrow(new IOException("Encryption enabled"));
saslClient.socketSend(socket, null, null, dataEncryptionKeyFactory,
null, null);
Assert.fail("Expected IOException from "
+ "SaslDataTransferClient#checkTrustAndSend");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Encryption enabled", e);
verify(dataEncryptionKeyFactory, times(1)).newDataEncryptionKey();
} finally {
IOUtils.cleanupWithLogger(null, socket, serverSocket);
}
}
@Test
public void testSaslDataTransferWithTrustedServerTrustedClient() throws
Exception {
HdfsConfiguration conf = createSecureConfig(
"authentication,integrity,privacy");
AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
TrustedChannelResolver trustedChannelResolver = new
TrustedChannelResolver() {
@Override
public boolean isTrusted() {
return true;
}
@Override
public boolean isTrusted(InetAddress peerAddress) {
return true;
}
};
SaslDataTransferClient saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
trustedChannelResolver, fallbackToSimpleAuth);
ServerSocket serverSocket = null;
Socket socket = null;
DataEncryptionKeyFactory dataEncryptionKeyFactory = null;
try {
serverSocket = new ServerSocket(10002, 10);
socket = new Socket(serverSocket.getInetAddress(),
serverSocket.getLocalPort());
dataEncryptionKeyFactory = mock(DataEncryptionKeyFactory.class);
Mockito.when(dataEncryptionKeyFactory.newDataEncryptionKey())
.thenThrow(new IOException("Encryption enabled"));
saslClient.socketSend(socket, null, null, dataEncryptionKeyFactory,
null, null);
verify(dataEncryptionKeyFactory, times(0)).newDataEncryptionKey();
} finally {
IOUtils.cleanupWithLogger(null, socket, serverSocket);
}
}
}