blob: 09f21d7e650a33a086b27b0abdaef9f4a861c2b3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
import static org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders.SELECTOR_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
import org.apache.hadoop.hbase.security.provider.AuthenticationProviderSelector;
import org.apache.hadoop.hbase.security.provider.BuiltInProviderSelector;
import org.apache.hadoop.hbase.security.provider.SaslAuthMethod;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.Mockito;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
@RunWith(Parameterized.class)
@Category({ SecurityTests.class, LargeTests.class })
public class TestSecureIPC {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSecureIPC.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final File KEYTAB_FILE = new File(
TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
private static MiniKdc KDC;
private static String HOST = "localhost";
private static String PRINCIPAL;
String krbKeytab;
String krbPrincipal;
UserGroupInformation ugi;
Configuration clientConf;
Configuration serverConf;
@Rule
public ExpectedException exception = ExpectedException.none();
@Parameters(name = "{index}: rpcClientImpl={0}, rpcServerImpl={1}")
public static Collection<Object[]> parameters() {
List<Object[]> params = new ArrayList<>();
List<String> rpcClientImpls = Arrays.asList(
BlockingRpcClient.class.getName(), NettyRpcClient.class.getName());
List<String> rpcServerImpls = Arrays.asList(
SimpleRpcServer.class.getName(), NettyRpcServer.class.getName());
for (String rpcClientImpl : rpcClientImpls) {
for (String rpcServerImpl : rpcServerImpls) {
params.add(new Object[] { rpcClientImpl, rpcServerImpl });
}
}
return params;
}
@Parameter(0)
public String rpcClientImpl;
@Parameter(1)
public String rpcServerImpl;
@BeforeClass
public static void setUp() throws Exception {
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
PRINCIPAL = "hbase/" + HOST;
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
}
@AfterClass
public static void tearDown() throws IOException {
if (KDC != null) {
KDC.stop();
}
TEST_UTIL.cleanupTestDir();
}
@Before
public void setUpTest() throws Exception {
krbKeytab = getKeytabFileForTesting();
krbPrincipal = getPrincipalForTesting();
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
clientConf = getSecuredConfiguration();
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
serverConf = getSecuredConfiguration();
serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
rpcServerImpl);
}
@Test
public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception {
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
// check that the login user is okay:
assertSame(ugi2, ugi);
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
assertEquals(krbPrincipal, ugi.getUserName());
callRpcService(User.create(ugi2));
}
@Test
public void testRpcCallWithEnabledKerberosSaslAuth_CanonicalHostname() throws Exception {
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
// check that the login user is okay:
assertSame(ugi2, ugi);
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
assertEquals(krbPrincipal, ugi.getUserName());
enableCanonicalHostnameTesting(clientConf, "localhost");
clientConf.setBoolean(
SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, false);
clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
callRpcService(User.create(ugi2));
}
@Test
public void testRpcCallWithEnabledKerberosSaslAuth_NoCanonicalHostname() throws Exception {
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
// check that the login user is okay:
assertSame(ugi2, ugi);
assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
assertEquals(krbPrincipal, ugi.getUserName());
enableCanonicalHostnameTesting(clientConf, "127.0.0.1");
clientConf.setBoolean(
SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, true);
clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
callRpcService(User.create(ugi2));
}
private static void enableCanonicalHostnameTesting(Configuration conf, String canonicalHostname) {
conf.setClass(SELECTOR_KEY,
CanonicalHostnameTestingAuthenticationProviderSelector.class,
AuthenticationProviderSelector.class);
conf.set(CanonicalHostnameTestingAuthenticationProviderSelector.CANONICAL_HOST_NAME_KEY,
canonicalHostname);
}
public static class CanonicalHostnameTestingAuthenticationProviderSelector extends
BuiltInProviderSelector {
private static final String CANONICAL_HOST_NAME_KEY =
"CanonicalHostnameTestingAuthenticationProviderSelector.canonicalHostName";
@Override
public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> selectProvider(
String clusterId, User user) {
final Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> pair =
super.selectProvider(clusterId, user);
pair.setFirst(createCanonicalHostNameTestingProvider(pair.getFirst()));
return pair;
}
SaslClientAuthenticationProvider createCanonicalHostNameTestingProvider(
SaslClientAuthenticationProvider delegate) {
return new SaslClientAuthenticationProvider() {
@Override
public SaslClient createClient(Configuration conf, InetAddress serverAddr,
SecurityInfo securityInfo, Token<? extends TokenIdentifier> token,
boolean fallbackAllowed, Map<String, String> saslProps) throws IOException {
final String s =
conf.get(CANONICAL_HOST_NAME_KEY);
if (s != null) {
try {
final Field canonicalHostName = InetAddress.class.getDeclaredField("canonicalHostName");
canonicalHostName.setAccessible(true);
canonicalHostName.set(serverAddr, s);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
return delegate.createClient(conf, serverAddr, securityInfo, token, fallbackAllowed, saslProps);
}
@Override
public UserInformation getUserInfo(User user) {
return delegate.getUserInfo(user);
}
@Override
public UserGroupInformation getRealUser(User ugi) {
return delegate.getRealUser(ugi);
}
@Override
public boolean canRetry() {
return delegate.canRetry();
}
@Override
public void relogin() throws IOException {
delegate.relogin();
}
@Override
public SaslAuthMethod getSaslAuthMethod() {
return delegate.getSaslAuthMethod();
}
@Override
public String getTokenKind() {
return delegate.getTokenKind();
}
};
}
}
@Test
public void testRpcFallbackToSimpleAuth() throws Exception {
String clientUsername = "testuser";
UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
new String[] { clientUsername });
// check that the client user is insecure
assertNotSame(ugi, clientUgi);
assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
assertEquals(clientUsername, clientUgi.getUserName());
clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
callRpcService(User.create(clientUgi));
}
void setRpcProtection(String clientProtection, String serverProtection) {
clientConf.set("hbase.rpc.protection", clientProtection);
serverConf.set("hbase.rpc.protection", serverProtection);
}
/**
* Test various combinations of Server and Client qops.
* @throws Exception
*/
@Test
public void testSaslWithCommonQop() throws Exception {
setRpcProtection("privacy,authentication", "authentication");
callRpcService(User.create(ugi));
setRpcProtection("authentication", "privacy,authentication");
callRpcService(User.create(ugi));
setRpcProtection("integrity,authentication", "privacy,authentication");
callRpcService(User.create(ugi));
setRpcProtection("integrity,authentication", "integrity,authentication");
callRpcService(User.create(ugi));
setRpcProtection("privacy,authentication", "privacy,authentication");
callRpcService(User.create(ugi));
}
@Test
public void testSaslNoCommonQop() throws Exception {
exception.expect(SaslException.class);
exception.expectMessage("No common protection layer between client and server");
setRpcProtection("integrity", "privacy");
callRpcService(User.create(ugi));
}
/**
* Test sasl encryption with Crypto AES.
* @throws Exception
*/
@Test
public void testSaslWithCryptoAES() throws Exception {
setRpcProtection("privacy", "privacy");
setCryptoAES("true", "true");
callRpcService(User.create(ugi));
}
/**
* Test various combinations of Server and Client configuration for Crypto AES.
* @throws Exception
*/
@Test
public void testDifferentConfWithCryptoAES() throws Exception {
setRpcProtection("privacy", "privacy");
setCryptoAES("false", "true");
callRpcService(User.create(ugi));
setCryptoAES("true", "false");
try {
callRpcService(User.create(ugi));
fail("The exception should be thrown out for the rpc timeout.");
} catch (Exception e) {
// ignore the expected exception
}
}
void setCryptoAES(String clientCryptoAES, String serverCryptoAES) {
clientConf.set("hbase.rpc.crypto.encryption.aes.enabled", clientCryptoAES);
serverConf.set("hbase.rpc.crypto.encryption.aes.enabled", serverCryptoAES);
}
private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
throws Exception {
Configuration cnf = new Configuration();
cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(cnf);
UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
return UserGroupInformation.getLoginUser();
}
/**
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
* the stub, this function will throw root cause of that exception.
*/
private void callRpcService(User clientUser) throws Exception {
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
Mockito.when(securityInfoMock.getServerPrincipal())
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)), isa,
serverConf, new FifoRpcScheduler(serverConf, 1));
rpcServer.start();
try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf,
HConstants.DEFAULT_CLUSTER_ID.toString())) {
BlockingInterface stub = newBlockingStub(rpcClient, rpcServer.getListenerAddress(),
clientUser);
TestThread th1 = new TestThread(stub);
final Throwable exception[] = new Throwable[1];
Collections.synchronizedList(new ArrayList<Throwable>());
Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread th, Throwable ex) {
exception[0] = ex;
}
};
th1.setUncaughtExceptionHandler(exceptionHandler);
th1.start();
th1.join();
if (exception[0] != null) {
// throw root cause.
while (exception[0].getCause() != null) {
exception[0] = exception[0].getCause();
}
throw (Exception) exception[0];
}
} finally {
rpcServer.stop();
}
}
public static class TestThread extends Thread {
private final BlockingInterface stub;
public TestThread(BlockingInterface stub) {
this.stub = stub;
}
@Override
public void run() {
try {
int[] messageSize = new int[] { 100, 1000, 10000 };
for (int i = 0; i < messageSize.length; i++) {
String input = RandomStringUtils.random(messageSize[i]);
String result = stub
.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(input).build())
.getMessage();
assertEquals(input, result);
}
} catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
throw new RuntimeException(e);
}
}
}
}