blob: 2de7f412b7de5455621fbc4720e895390d02f761 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
import org.apache.hadoop.hbase.security.SecurityConstants;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
@RunWith(Parameterized.class)
@Category({ MiscTests.class, LargeTests.class })
public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestSaslFanOutOneBlockAsyncDFSOutput.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSaslFanOutOneBlockAsyncDFSOutput.class);
private static DistributedFileSystem FS;
private static EventLoopGroup EVENT_LOOP_GROUP;
private static Class<? extends Channel> CHANNEL_CLASS;
private static int READ_TIMEOUT_MS = 200000;
private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath());
private static MiniKdc KDC;
private static String HOST = "localhost";
private static String USERNAME;
private static String PRINCIPAL;
private static String HTTP_PRINCIPAL;
private static String TEST_KEY_NAME = "test_key";
@Rule
public TestName name = new TestName();
@Parameter(0)
public String protection;
@Parameter(1)
public String encryptionAlgorithm;
@Parameter(2)
public String cipherSuite;
@Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}")
public static Iterable<Object[]> data() {
List<Object[]> params = new ArrayList<>();
for (String protection : Arrays.asList("authentication", "integrity", "privacy")) {
for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) {
params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite });
}
}
}
return params;
}
private static void setUpKeyProvider(Configuration conf) throws Exception {
URI keyProviderUri =
new URI("jceks://file" + UTIL.getDataTestDir("test.jks").toUri().toString());
conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString());
KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf);
keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf));
keyProvider.flush();
keyProvider.close();
}
/**
* Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
* keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}.
*/
private static MiniKdc setupMiniKdc(File keytabFile) throws Exception {
Properties conf = MiniKdc.createConf();
conf.put(MiniKdc.DEBUG, true);
MiniKdc kdc = null;
File dir = null;
// There is time lag between selecting a port and trying to bind with it. It's possible that
// another service captures the port in between which'll result in BindException.
boolean bindException;
int numTries = 0;
do {
try {
bindException = false;
dir = new File(UTIL.getDataTestDir("kdc").toUri().getPath());
kdc = new MiniKdc(conf, dir);
kdc.start();
} catch (BindException e) {
FileUtils.deleteDirectory(dir); // clean directory
numTries++;
if (numTries == 3) {
LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
throw e;
}
LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
bindException = true;
}
} while (bindException);
System.setProperty(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
keytabFile.getAbsolutePath());
return kdc;
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
EVENT_LOOP_GROUP = new NioEventLoopGroup();
CHANNEL_CLASS = NioSocketChannel.class;
UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
KDC = setupMiniKdc(KEYTAB_FILE);
USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
PRINCIPAL = USERNAME + "/" + HOST;
HTTP_PRINCIPAL = "HTTP/" + HOST;
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL);
setUpKeyProvider(UTIL.getConfiguration());
HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(),
PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm());
HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class);
}
@AfterClass
public static void tearDownAfterClass() throws IOException, InterruptedException {
if (EVENT_LOOP_GROUP != null) {
EVENT_LOOP_GROUP.shutdownGracefully().sync();
}
if (KDC != null) {
KDC.stop();
}
shutdownMiniDFSCluster();
}
private Path testDirOnTestFs;
private Path entryptionTestDirOnTestFs;
private void createEncryptionZone() throws Exception {
Method method =
DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class);
method.invoke(FS, entryptionTestDirOnTestFs, TEST_KEY_NAME);
}
@Before
public void setUp() throws Exception {
UTIL.getConfiguration().set("dfs.data.transfer.protection", protection);
if (StringUtils.isBlank(encryptionAlgorithm) && StringUtils.isBlank(cipherSuite)) {
UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false);
} else {
UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
}
if (StringUtils.isBlank(encryptionAlgorithm)) {
UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
} else {
UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm);
}
if (StringUtils.isBlank(cipherSuite)) {
UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
} else {
UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite);
}
startMiniDFSCluster(3);
FS = CLUSTER.getFileSystem();
testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
FS.mkdirs(testDirOnTestFs);
entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc");
FS.mkdirs(entryptionTestDirOnTestFs);
createEncryptionZone();
}
@After
public void tearDown() throws IOException {
shutdownMiniDFSCluster();
}
private Path getTestFile() {
return new Path(testDirOnTestFs, "test");
}
private Path getEncryptionTestFile() {
return new Path(entryptionTestDirOnTestFs, "test");
}
private void test(Path file) throws IOException, InterruptedException, ExecutionException {
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
}
@Test
public void test() throws IOException, InterruptedException, ExecutionException {
test(getTestFile());
test(getEncryptionTestFile());
}
}