blob: 237f1cb40259bfca448cb0a8624d4c692f6dd3d3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.security.sasl.SaslException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.security.SecurityInfo;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.handler.codec.DecoderException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
/**
* Tests for HBASE-28321, where we have multiple server principals candidates for a rpc service.
* <p>
* Put here just because we need to visit some package private classes under this package.
*/
@RunWith(Parameterized.class)
@Category({ SecurityTests.class, MediumTests.class })
public class TestMultipleServerPrincipalsIPC {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMultipleServerPrincipalsIPC.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final File KEYTAB_FILE =
new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
private static MiniKdc KDC;
private static String HOST = "localhost";
private static String SERVER_PRINCIPAL;
private static String SERVER_PRINCIPAL2;
private static String CLIENT_PRINCIPAL;
@Parameter(0)
public Class<? extends RpcServer> rpcServerImpl;
@Parameter(1)
public Class<? extends RpcClient> rpcClientImpl;
private Configuration clientConf;
private Configuration serverConf;
private UserGroupInformation clientUGI;
private UserGroupInformation serverUGI;
private RpcServer rpcServer;
private RpcClient rpcClient;
@Parameters(name = "{index}: rpcServerImpl={0}, rpcClientImpl={1}")
public static List<Object[]> params() {
List<Object[]> params = new ArrayList<>();
List<Class<? extends RpcServer>> rpcServerImpls =
Arrays.asList(NettyRpcServer.class, SimpleRpcServer.class);
List<Class<? extends RpcClient>> rpcClientImpls =
Arrays.asList(NettyRpcClient.class, BlockingRpcClient.class);
for (Class<? extends RpcServer> rpcServerImpl : rpcServerImpls) {
for (Class<? extends RpcClient> rpcClientImpl : rpcClientImpls) {
params.add(new Object[] { rpcServerImpl, rpcClientImpl });
}
}
return params;
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
SERVER_PRINCIPAL = "server/" + HOST + "@" + KDC.getRealm();
SERVER_PRINCIPAL2 = "server2/" + HOST + "@" + KDC.getRealm();
CLIENT_PRINCIPAL = "client";
KDC.createPrincipal(KEYTAB_FILE, CLIENT_PRINCIPAL, SERVER_PRINCIPAL, SERVER_PRINCIPAL2);
setSecuredConfiguration(TEST_UTIL.getConfiguration());
TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxbackoff", 1);
TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 0);
TEST_UTIL.getConfiguration().setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 10);
}
@AfterClass
public static void tearDownAfterClass() {
if (KDC != null) {
KDC.stop();
}
}
private static void setSecuredConfiguration(Configuration conf) {
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
conf.set(User.HBASE_SECURITY_CONF_KEY, "kerberos");
conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
}
private void loginAndStartRpcServer(String principal, int port) throws Exception {
UserGroupInformation.setConfiguration(serverConf);
serverUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal,
KEYTAB_FILE.getCanonicalPath());
rpcServer = serverUGI.doAs((PrivilegedExceptionAction<
RpcServer>) () -> RpcServerFactory.createRpcServer(null, getClass().getSimpleName(),
Lists.newArrayList(
new RpcServer.BlockingServiceAndInterface(TestProtobufRpcServiceImpl.SERVICE, null)),
new InetSocketAddress(HOST, port), serverConf, new FifoRpcScheduler(serverConf, 1)));
rpcServer.start();
}
@Before
public void setUp() throws Exception {
clientConf = new Configuration(TEST_UTIL.getConfiguration());
clientConf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl,
RpcClient.class);
String serverPrincipalConfigName = "hbase.test.multiple.principal.first";
String serverPrincipalConfigName2 = "hbase.test.multiple.principal.second";
clientConf.set(serverPrincipalConfigName, SERVER_PRINCIPAL);
clientConf.set(serverPrincipalConfigName2, SERVER_PRINCIPAL2);
serverConf = new Configuration(TEST_UTIL.getConfiguration());
serverConf.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl,
RpcServer.class);
SecurityInfo securityInfo = new SecurityInfo(Kind.HBASE_AUTH_TOKEN, serverPrincipalConfigName2,
serverPrincipalConfigName);
SecurityInfo.addInfo(TestProtobufRpcProto.getDescriptor().getName(), securityInfo);
UserGroupInformation.setConfiguration(clientConf);
clientUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL,
KEYTAB_FILE.getCanonicalPath());
loginAndStartRpcServer(SERVER_PRINCIPAL, 0);
rpcClient = clientUGI.doAs((PrivilegedExceptionAction<RpcClient>) () -> RpcClientFactory
.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString()));
}
@After
public void tearDown() throws IOException {
Closeables.close(rpcClient, true);
rpcServer.stop();
}
private String echo(String msg) throws Exception {
return clientUGI.doAs((PrivilegedExceptionAction<String>) () -> {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
ServerName.valueOf(HOST, rpcServer.getListenerAddress().getPort(), -1), User.getCurrent(),
10000);
TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcProto.newBlockingStub(channel);
return stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(msg).build())
.getMessage();
});
}
@Test
public void testEcho() throws Exception {
String msg = "Hello World";
assertEquals(msg, echo(msg));
}
@Test
public void testMaliciousServer() throws Exception {
// reset the server principals so the principal returned by server does not match
SecurityInfo securityInfo =
SecurityInfo.getInfo(TestProtobufRpcProto.getDescriptor().getName());
for (int i = 0; i < securityInfo.getServerPrincipals().size(); i++) {
clientConf.set(securityInfo.getServerPrincipals().get(i),
"valid_server_" + i + "/" + HOST + "@" + KDC.getRealm());
}
UndeclaredThrowableException error =
assertThrows(UndeclaredThrowableException.class, () -> echo("whatever"));
assertThat(error.getCause(), instanceOf(ServiceException.class));
assertThat(error.getCause().getCause(), instanceOf(SaslException.class));
}
@Test
public void testRememberLastSucceededServerPrincipal() throws Exception {
// after this call we will remember the last succeeded server principal
assertEquals("a", echo("a"));
// shutdown the connection, but does not remove it from pool
RpcConnection conn =
Iterables.getOnlyElement(((AbstractRpcClient<?>) rpcClient).getConnections().values());
conn.shutdown();
// recreate rpc server with server principal2
int port = rpcServer.getListenerAddress().getPort();
rpcServer.stop();
serverUGI.logoutUserFromKeytab();
loginAndStartRpcServer(SERVER_PRINCIPAL2, port);
// this time we will still use the remembered server principal, so we will get a sasl exception
UndeclaredThrowableException error =
assertThrows(UndeclaredThrowableException.class, () -> echo("a"));
assertThat(error.getCause(), instanceOf(ServiceException.class));
// created by IPCUtil.wrap, to prepend the server address
assertThat(error.getCause().getCause(), instanceOf(IOException.class));
// wraped IPCUtil.toIOE
assertThat(error.getCause().getCause().getCause(), instanceOf(IOException.class));
Throwable cause = error.getCause().getCause().getCause().getCause();
// for netty rpc client, it is DecoderException, for blocking rpc client, it is already
// RemoteExcetion
assertThat(cause,
either(instanceOf(DecoderException.class)).or(instanceOf(RemoteException.class)));
RemoteException rme;
if (!(cause instanceof RemoteException)) {
assertThat(cause.getCause(), instanceOf(RemoteException.class));
rme = (RemoteException) cause.getCause();
} else {
rme = (RemoteException) cause;
}
assertEquals(SaslException.class.getName(), rme.getClassName());
// the above failure will clear the remembered server principal, so this time we will get the
// correct one. We use retry here just because a failure of sasl negotiation will trigger a
// relogin and it may take some time, and for netty based implementation the relogin is async
TEST_UTIL.waitFor(10000, () -> {
try {
echo("a");
} catch (UndeclaredThrowableException e) {
Throwable t = e.getCause().getCause();
assertThat(t, instanceOf(IOException.class));
if (!(t instanceof FailedServerException)) {
// for netty rpc client
assertThat(e.getCause().getMessage(),
containsString(RpcConnectionConstants.RELOGIN_IS_IN_PROGRESS));
}
return false;
}
return true;
});
}
}