| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.ipc; |
| |
| import com.google.common.base.Supplier; |
| import com.google.protobuf.ServiceException; |
| import org.apache.hadoop.HadoopIllegalArgumentException; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.io.retry.RetryPolicies; |
| import org.apache.hadoop.io.retry.RetryPolicy; |
| import org.apache.hadoop.io.retry.RetryProxy; |
| import org.apache.hadoop.ipc.Client.ConnectionId; |
| import org.apache.hadoop.ipc.Server.Call; |
| import org.apache.hadoop.ipc.Server.Connection; |
| import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; |
| import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; |
| import org.apache.hadoop.ipc.protobuf.TestProtos; |
| import org.apache.hadoop.metrics2.MetricsRecordBuilder; |
| import org.apache.hadoop.metrics2.lib.MutableCounterLong; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; |
| import org.apache.hadoop.security.authorize.AuthorizationException; |
| import org.apache.hadoop.security.authorize.PolicyProvider; |
| import org.apache.hadoop.security.authorize.Service; |
| import org.apache.hadoop.security.token.SecretManager; |
| import org.apache.hadoop.security.token.TokenIdentifier; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.test.MetricsAsserts; |
| import org.apache.hadoop.test.MockitoUtil; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| import org.mockito.internal.util.reflection.Whitebox; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.slf4j.event.Level; |
| |
| import javax.net.SocketFactory; |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.lang.reflect.InvocationHandler; |
| import java.lang.reflect.Method; |
| import java.lang.reflect.Proxy; |
| import java.net.ConnectException; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.SocketTimeoutException; |
| import java.nio.ByteBuffer; |
| import java.security.PrivilegedAction; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.CyclicBarrier; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import static org.apache.hadoop.test.MetricsAsserts.assertCounter; |
| import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; |
| import static org.apache.hadoop.test.MetricsAsserts.assertGauge; |
| import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge; |
| import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; |
| import static org.apache.hadoop.test.MetricsAsserts.getMetrics; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNotSame; |
| import static org.junit.Assert.assertSame; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.timeout; |
| import static org.mockito.Mockito.verify; |
| |
| /** Unit tests for RPC. */ |
| @SuppressWarnings("deprecation") |
| public class TestRPC extends TestRpcBase { |
| |
| public static final Logger LOG = LoggerFactory.getLogger(TestRPC.class); |
| |
| @Before |
| public void setup() { |
| setupConf(); |
| } |
| |
| int datasize = 1024*100; |
| int numThreads = 50; |
| |
| public interface TestProtocol extends VersionedProtocol { |
| long versionID = 1L; |
| |
| void ping() throws IOException; |
| void sleep(long delay) throws IOException, InterruptedException; |
| String echo(String value) throws IOException; |
| String[] echo(String[] value) throws IOException; |
| Writable echo(Writable value) throws IOException; |
| int add(int v1, int v2) throws IOException; |
| int add(int[] values) throws IOException; |
| int error() throws IOException; |
| } |
| |
| public static class TestImpl implements TestProtocol { |
| int fastPingCounter = 0; |
| |
| @Override |
| public long getProtocolVersion(String protocol, long clientVersion) { |
| return TestProtocol.versionID; |
| } |
| |
| @Override |
| public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, |
| int hashcode) { |
| return new ProtocolSignature(TestProtocol.versionID, null); |
| } |
| |
| @Override |
| public void ping() {} |
| |
| @Override |
| public void sleep(long delay) throws InterruptedException { |
| Thread.sleep(delay); |
| } |
| |
| @Override |
| public String echo(String value) throws IOException { return value; } |
| |
| @Override |
| public String[] echo(String[] values) throws IOException { return values; } |
| |
| @Override |
| public Writable echo(Writable writable) { |
| return writable; |
| } |
| @Override |
| public int add(int v1, int v2) { |
| return v1 + v2; |
| } |
| |
| @Override |
| public int add(int[] values) { |
| int sum = 0; |
| for (int i = 0; i < values.length; i++) { |
| sum += values[i]; |
| } |
| return sum; |
| } |
| |
| @Override |
| public int error() throws IOException { |
| throw new IOException("bobo"); |
| } |
| } |
| |
| // |
| // an object that does a bunch of transactions |
| // |
| static class Transactions implements Runnable { |
| int datasize; |
| TestRpcService proxy; |
| |
| Transactions(TestRpcService proxy, int datasize) { |
| this.proxy = proxy; |
| this.datasize = datasize; |
| } |
| |
| // do two RPC that transfers data. |
| @Override |
| public void run() { |
| Integer[] indata = new Integer[datasize]; |
| Arrays.fill(indata, 123); |
| TestProtos.ExchangeRequestProto exchangeRequest = |
| TestProtos.ExchangeRequestProto.newBuilder().addAllValues( |
| Arrays.asList(indata)).build(); |
| Integer[] outdata = null; |
| TestProtos.ExchangeResponseProto exchangeResponse; |
| |
| TestProtos.AddRequestProto addRequest = |
| TestProtos.AddRequestProto.newBuilder().setParam1(1) |
| .setParam2(2).build(); |
| TestProtos.AddResponseProto addResponse; |
| |
| int val = 0; |
| try { |
| exchangeResponse = proxy.exchange(null, exchangeRequest); |
| outdata = new Integer[exchangeResponse.getValuesCount()]; |
| outdata = exchangeResponse.getValuesList().toArray(outdata); |
| addResponse = proxy.add(null, addRequest); |
| val = addResponse.getResult(); |
| } catch (ServiceException e) { |
| assertTrue("Exception from RPC exchange() " + e, false); |
| } |
| assertEquals(indata.length, outdata.length); |
| assertEquals(3, val); |
| for (int i = 0; i < outdata.length; i++) { |
| assertEquals(outdata[i].intValue(), i); |
| } |
| } |
| } |
| |
| // |
| // A class that does an RPC but does not read its response. |
| // |
| static class SlowRPC implements Runnable { |
| private TestRpcService proxy; |
| private volatile boolean done; |
| |
| SlowRPC(TestRpcService proxy) { |
| this.proxy = proxy; |
| done = false; |
| } |
| |
| boolean isDone() { |
| return done; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| // this would hang until two fast pings happened |
| ping(true); |
| done = true; |
| } catch (ServiceException e) { |
| assertTrue("SlowRPC ping exception " + e, false); |
| } |
| } |
| |
| void ping(boolean shouldSlow) throws ServiceException { |
| // this would hang until two fast pings happened |
| proxy.slowPing(null, newSlowPingRequest(shouldSlow)); |
| } |
| } |
| |
| /** |
| * A basic interface for testing client-side RPC resource cleanup. |
| */ |
| private interface StoppedProtocol { |
| long versionID = 0; |
| |
| void stop(); |
| } |
| |
| /** |
| * A class used for testing cleanup of client side RPC resources. |
| */ |
| private static class StoppedRpcEngine implements RpcEngine { |
| |
| @Override |
| public <T> ProtocolProxy<T> getProxy( |
| Class<T> protocol, long clientVersion, InetSocketAddress addr, |
| UserGroupInformation ticket, Configuration conf, |
| SocketFactory factory, int rpcTimeout, |
| RetryPolicy connectionRetryPolicy) throws IOException { |
| return getProxy(protocol, clientVersion, addr, ticket, conf, factory, |
| rpcTimeout, connectionRetryPolicy, null, null); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public <T> ProtocolProxy<T> getProxy( |
| Class<T> protocol, long clientVersion, InetSocketAddress addr, |
| UserGroupInformation ticket, Configuration conf, SocketFactory factory, |
| int rpcTimeout, RetryPolicy connectionRetryPolicy, |
| AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) |
| throws IOException { |
| T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), |
| new Class[] { protocol }, new StoppedInvocationHandler()); |
| return new ProtocolProxy<T>(protocol, proxy, false); |
| } |
| |
| @Override |
| public org.apache.hadoop.ipc.RPC.Server getServer( |
| Class<?> protocol, Object instance, String bindAddress, int port, |
| int numHandlers, int numReaders, int queueSizePerHandler, |
| boolean verbose, Configuration conf, |
| SecretManager<? extends TokenIdentifier> secretManager, |
| String portRangeConfig, AlignmentContext alignmentContext) |
| throws IOException { |
| return null; |
| } |
| |
| @Override |
| public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy( |
| ConnectionId connId, Configuration conf, SocketFactory factory) |
| throws IOException { |
| throw new UnsupportedOperationException("This proxy is not supported"); |
| } |
| } |
| |
| /** |
| * An invocation handler which does nothing when invoking methods, and just |
| * counts the number of times close() is called. |
| */ |
| private static class StoppedInvocationHandler |
| implements InvocationHandler, Closeable { |
| |
| private int closeCalled = 0; |
| |
| @Override |
| public Object invoke(Object proxy, Method method, Object[] args) |
| throws Throwable { |
| return null; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| closeCalled++; |
| } |
| |
| public int getCloseCalled() { |
| return closeCalled; |
| } |
| |
| } |
| |
| @Test |
| public void testConfRpc() throws IOException { |
| Server server = newServerBuilder(conf) |
| .setNumHandlers(1).setVerbose(false).build(); |
| |
| // Just one handler |
| int confQ = conf.getInt( |
| CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, |
| CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); |
| assertEquals(confQ, server.getMaxQueueSize()); |
| |
| int confReaders = conf.getInt( |
| CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, |
| CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); |
| assertEquals(confReaders, server.getNumReaders()); |
| |
| server = newServerBuilder(conf) |
| .setNumHandlers(1).setnumReaders(3).setQueueSizePerHandler(200) |
| .setVerbose(false).build(); |
| |
| assertEquals(3, server.getNumReaders()); |
| assertEquals(200, server.getMaxQueueSize()); |
| |
| server = newServerBuilder(conf).setQueueSizePerHandler(10) |
| .setNumHandlers(2).setVerbose(false).build(); |
| assertEquals(2 * 10, server.getMaxQueueSize()); |
| } |
| |
| @Test |
| public void testProxyAddress() throws Exception { |
| Server server = null; |
| TestRpcService proxy = null; |
| |
| try { |
| server = setupTestServer(conf, -1); |
| // create a client |
| proxy = getClient(addr, conf); |
| assertEquals(addr, RPC.getServerAddress(proxy)); |
| } finally { |
| stop(server, proxy); |
| } |
| } |
| |
| @Test |
| public void testSlowRpc() throws IOException, ServiceException { |
| Server server; |
| TestRpcService proxy = null; |
| |
| System.out.println("Testing Slow RPC"); |
| // create a server with two handlers |
| server = setupTestServer(conf, 2); |
| |
| try { |
| // create a client |
| proxy = getClient(addr, conf); |
| |
| SlowRPC slowrpc = new SlowRPC(proxy); |
| Thread thread = new Thread(slowrpc, "SlowRPC"); |
| thread.start(); // send a slow RPC, which won't return until two fast pings |
| assertTrue("Slow RPC should not have finished1.", !slowrpc.isDone()); |
| |
| slowrpc.ping(false); // first fast ping |
| |
| // verify that the first RPC is still stuck |
| assertTrue("Slow RPC should not have finished2.", !slowrpc.isDone()); |
| |
| slowrpc.ping(false); // second fast ping |
| |
| // Now the slow ping should be able to be executed |
| while (!slowrpc.isDone()) { |
| System.out.println("Waiting for slow RPC to get done."); |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) {} |
| } |
| } finally { |
| System.out.println("Down slow rpc testing"); |
| stop(server, proxy); |
| } |
| } |
| |
| @Test |
| public void testCalls() throws Exception { |
| testCallsInternal(conf); |
| } |
| |
| private void testCallsInternal(Configuration myConf) throws Exception { |
| Server server; |
| TestRpcService proxy = null; |
| |
| server = setupTestServer(myConf, -1); |
| try { |
| proxy = getClient(addr, myConf); |
| |
| proxy.ping(null, newEmptyRequest()); |
| |
| TestProtos.EchoResponseProto echoResp = proxy.echo(null, |
| newEchoRequest("foo")); |
| assertEquals(echoResp.getMessage(), "foo"); |
| |
| echoResp = proxy.echo(null, newEchoRequest("")); |
| assertEquals(echoResp.getMessage(), ""); |
| |
| // Check rpcMetrics |
| MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name()); |
| assertCounter("RpcProcessingTimeNumOps", 3L, rb); |
| assertCounterGt("SentBytes", 0L, rb); |
| assertCounterGt("ReceivedBytes", 0L, rb); |
| |
| // Number of calls to echo method should be 2 |
| rb = getMetrics(server.rpcDetailedMetrics.name()); |
| assertCounter("EchoNumOps", 2L, rb); |
| |
| // Number of calls to ping method should be 1 |
| assertCounter("PingNumOps", 1L, rb); |
| |
| String[] strings = new String[] {"foo","bar"}; |
| TestProtos.EchoRequestProto2 echoRequest2 = |
| TestProtos.EchoRequestProto2.newBuilder().addAllMessage( |
| Arrays.asList(strings)).build(); |
| TestProtos.EchoResponseProto2 echoResponse2 = |
| proxy.echo2(null, echoRequest2); |
| assertTrue(Arrays.equals(echoResponse2.getMessageList().toArray(), |
| strings)); |
| |
| echoRequest2 = TestProtos.EchoRequestProto2.newBuilder() |
| .addAllMessage(Collections.<String>emptyList()).build(); |
| echoResponse2 = proxy.echo2(null, echoRequest2); |
| assertTrue(Arrays.equals(echoResponse2.getMessageList().toArray(), |
| new String[]{})); |
| |
| TestProtos.AddRequestProto addRequest = |
| TestProtos.AddRequestProto.newBuilder().setParam1(1) |
| .setParam2(2).build(); |
| TestProtos.AddResponseProto addResponse = |
| proxy.add(null, addRequest); |
| assertEquals(addResponse.getResult(), 3); |
| |
| Integer[] integers = new Integer[] {1, 2}; |
| TestProtos.AddRequestProto2 addRequest2 = |
| TestProtos.AddRequestProto2.newBuilder().addAllParams( |
| Arrays.asList(integers)).build(); |
| addResponse = proxy.add2(null, addRequest2); |
| assertEquals(addResponse.getResult(), 3); |
| |
| boolean caught = false; |
| try { |
| proxy.error(null, newEmptyRequest()); |
| } catch (ServiceException e) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Caught " + e); |
| } |
| caught = true; |
| } |
| assertTrue(caught); |
| rb = getMetrics(server.rpcDetailedMetrics.name()); |
| assertCounter("RpcServerExceptionNumOps", 1L, rb); |
| |
| //proxy.testServerGet(); |
| |
| // create multiple threads and make them do large data transfers |
| System.out.println("Starting multi-threaded RPC test..."); |
| server.setSocketSendBufSize(1024); |
| Thread threadId[] = new Thread[numThreads]; |
| for (int i = 0; i < numThreads; i++) { |
| Transactions trans = new Transactions(proxy, datasize); |
| threadId[i] = new Thread(trans, "TransactionThread-" + i); |
| threadId[i].start(); |
| } |
| |
| // wait for all transactions to get over |
| System.out.println("Waiting for all threads to finish RPCs..."); |
| for (int i = 0; i < numThreads; i++) { |
| try { |
| threadId[i].join(); |
| } catch (InterruptedException e) { |
| i--; // retry |
| } |
| } |
| } finally { |
| stop(server, proxy); |
| } |
| } |
| |
| @Test |
| public void testClientWithoutServer() throws Exception { |
| TestRpcService proxy; |
| |
| short invalidPort = 20; |
| InetSocketAddress invalidAddress = new InetSocketAddress(ADDRESS, |
| invalidPort); |
| long invalidClientVersion = 1L; |
| try { |
| proxy = RPC.getProxy(TestRpcService.class, |
| invalidClientVersion, invalidAddress, conf); |
| // Test echo method |
| proxy.echo(null, newEchoRequest("hello")); |
| fail("We should not have reached here"); |
| } catch (ServiceException ioe) { |
| //this is what we expected |
| if (!(ioe.getCause() instanceof ConnectException)) { |
| fail("We should not have reached here"); |
| } |
| } |
| } |
| |
| private static final String ACL_CONFIG = "test.protocol.acl"; |
| |
| private static class TestPolicyProvider extends PolicyProvider { |
| |
| @Override |
| public Service[] getServices() { |
| return new Service[] { new Service(ACL_CONFIG, TestRpcService.class) }; |
| } |
| } |
| |
| private void doRPCs(Configuration myConf, boolean expectFailure) throws Exception { |
| Server server; |
| TestRpcService proxy = null; |
| |
| server = setupTestServer(myConf, 5); |
| |
| server.refreshServiceAcl(myConf, new TestPolicyProvider()); |
| |
| TestProtos.EmptyRequestProto emptyRequestProto = |
| TestProtos.EmptyRequestProto.newBuilder().build(); |
| |
| try { |
| proxy = getClient(addr, conf); |
| proxy.ping(null, emptyRequestProto); |
| if (expectFailure) { |
| fail("Expect RPC.getProxy to fail with AuthorizationException!"); |
| } |
| } catch (ServiceException e) { |
| if (expectFailure) { |
| RemoteException re = (RemoteException) e.getCause(); |
| assertTrue(re.unwrapRemoteException() instanceof AuthorizationException); |
| assertEquals("RPC error code should be UNAUTHORIZED", |
| RpcErrorCodeProto.FATAL_UNAUTHORIZED, re.getErrorCode()); |
| } else { |
| throw e; |
| } |
| } finally { |
| MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name()); |
| if (expectFailure) { |
| assertCounter("RpcAuthorizationFailures", 1L, rb); |
| } else { |
| assertCounter("RpcAuthorizationSuccesses", 1L, rb); |
| } |
| //since we don't have authentication turned ON, we should see |
| // 0 for the authentication successes and 0 for failure |
| assertCounter("RpcAuthenticationFailures", 0L, rb); |
| assertCounter("RpcAuthenticationSuccesses", 0L, rb); |
| |
| stop(server, proxy); |
| } |
| } |
| |
| @Test |
| public void testServerAddress() throws IOException { |
| Server server; |
| |
| server = setupTestServer(conf, 5); |
| try { |
| InetSocketAddress bindAddr = NetUtils.getConnectAddress(server); |
| assertEquals(InetAddress.getLocalHost(), bindAddr.getAddress()); |
| } finally { |
| stop(server, null); |
| } |
| } |
| |
| @Test |
| public void testAuthorization() throws Exception { |
| Configuration myConf = new Configuration(); |
| myConf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, |
| true); |
| |
| // Expect to succeed |
| myConf.set(ACL_CONFIG, "*"); |
| doRPCs(myConf, false); |
| |
| // Reset authorization to expect failure |
| myConf.set(ACL_CONFIG, "invalid invalid"); |
| doRPCs(myConf, true); |
| |
| myConf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2); |
| // Expect to succeed |
| myConf.set(ACL_CONFIG, "*"); |
| doRPCs(myConf, false); |
| |
| // Reset authorization to expect failure |
| myConf.set(ACL_CONFIG, "invalid invalid"); |
| doRPCs(myConf, true); |
| } |
| |
| /** |
| * Switch off setting socketTimeout values on RPC sockets. |
| * Verify that RPC calls still work ok. |
| */ |
| public void testNoPings() throws Exception { |
| Configuration conf = new Configuration(); |
| |
| conf.setBoolean("ipc.client.ping", false); |
| new TestRPC().testCallsInternal(conf); |
| |
| conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2); |
| new TestRPC().testCallsInternal(conf); |
| } |
| |
| /** |
| * Test stopping a non-registered proxy |
| * @throws IOException |
| */ |
| @Test(expected=HadoopIllegalArgumentException.class) |
| public void testStopNonRegisteredProxy() throws IOException { |
| RPC.stopProxy(null); |
| } |
| |
| /** |
| * Test that the mockProtocol helper returns mock proxies that can |
| * be stopped without error. |
| */ |
| @Test |
| public void testStopMockObject() throws IOException { |
| RPC.stopProxy(MockitoUtil.mockProtocol(TestProtocol.class)); |
| } |
| |
| @Test |
| public void testStopProxy() throws IOException { |
| RPC.setProtocolEngine(conf, |
| StoppedProtocol.class, StoppedRpcEngine.class); |
| |
| StoppedProtocol proxy = RPC.getProxy(StoppedProtocol.class, |
| StoppedProtocol.versionID, null, conf); |
| StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler) |
| Proxy.getInvocationHandler(proxy); |
| assertEquals(0, invocationHandler.getCloseCalled()); |
| RPC.stopProxy(proxy); |
| assertEquals(1, invocationHandler.getCloseCalled()); |
| } |
| |
| @Test |
| public void testWrappedStopProxy() throws IOException { |
| StoppedProtocol wrappedProxy = RPC.getProxy(StoppedProtocol.class, |
| StoppedProtocol.versionID, null, conf); |
| StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler) |
| Proxy.getInvocationHandler(wrappedProxy); |
| |
| StoppedProtocol proxy = (StoppedProtocol) RetryProxy.create( |
| StoppedProtocol.class, wrappedProxy, RetryPolicies.RETRY_FOREVER); |
| |
| assertEquals(0, invocationHandler.getCloseCalled()); |
| RPC.stopProxy(proxy); |
| assertEquals(1, invocationHandler.getCloseCalled()); |
| } |
| |
| @Test |
| public void testErrorMsgForInsecureClient() throws IOException { |
| Server server; |
| TestRpcService proxy = null; |
| |
| Configuration serverConf = new Configuration(conf); |
| SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, |
| serverConf); |
| UserGroupInformation.setConfiguration(serverConf); |
| |
| server = setupTestServer(serverConf, 5); |
| |
| boolean succeeded = false; |
| |
| try { |
| UserGroupInformation.setConfiguration(conf); |
| proxy = getClient(addr, conf); |
| proxy.echo(null, newEchoRequest("")); |
| } catch (ServiceException e) { |
| assertTrue(e.getCause() instanceof RemoteException); |
| RemoteException re = (RemoteException) e.getCause(); |
| LOG.info("LOGGING MESSAGE: " + re.getLocalizedMessage()); |
| assertEquals("RPC error code should be UNAUTHORIZED", |
| RpcErrorCodeProto.FATAL_UNAUTHORIZED, re.getErrorCode()); |
| assertTrue(re.unwrapRemoteException() instanceof AccessControlException); |
| succeeded = true; |
| } finally { |
| stop(server, proxy); |
| } |
| assertTrue(succeeded); |
| |
| conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2); |
| |
| UserGroupInformation.setConfiguration(serverConf); |
| server = setupTestServer(serverConf, 5); |
| succeeded = false; |
| proxy = null; |
| try { |
| UserGroupInformation.setConfiguration(conf); |
| proxy = getClient(addr, conf); |
| proxy.echo(null, newEchoRequest("")); |
| } catch (ServiceException e) { |
| RemoteException re = (RemoteException) e.getCause(); |
| LOG.info("LOGGING MESSAGE: " + re.getLocalizedMessage()); |
| assertEquals("RPC error code should be UNAUTHORIZED", |
| RpcErrorCodeProto.FATAL_UNAUTHORIZED, re.getErrorCode()); |
| assertTrue(re.unwrapRemoteException() instanceof AccessControlException); |
| succeeded = true; |
| } finally { |
| stop(server, proxy); |
| } |
| assertTrue(succeeded); |
| } |
| |
| /** |
| * Test that server.stop() properly stops all threads |
| */ |
| @Test |
| public void testStopsAllThreads() throws IOException, InterruptedException { |
| Server server; |
| |
| int threadsBefore = countThreads("Server$Listener$Reader"); |
| assertEquals("Expect no Reader threads running before test", |
| 0, threadsBefore); |
| |
| server = setupTestServer(conf, 5); |
| |
| try { |
| // Wait for at least one reader thread to start |
| int threadsRunning = 0; |
| long totalSleepTime = 0; |
| do { |
| totalSleepTime += 10; |
| Thread.sleep(10); |
| threadsRunning = countThreads("Server$Listener$Reader"); |
| } while (threadsRunning == 0 && totalSleepTime < 5000); |
| |
| // Validate that at least one thread started (we didn't timeout) |
| threadsRunning = countThreads("Server$Listener$Reader"); |
| assertTrue(threadsRunning > 0); |
| } finally { |
| server.stop(); |
| } |
| |
| int threadsAfter = countThreads("Server$Listener$Reader"); |
| assertEquals("Expect no Reader threads left running after test", |
| 0, threadsAfter); |
| } |
| |
| @Test |
| public void testRPCBuilder() throws IOException { |
| // Test mandatory field conf |
| try { |
| new RPC.Builder(null).setProtocol(TestProtocol.class) |
| .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) |
| .setNumHandlers(5).setVerbose(true).build(); |
| fail("Didn't throw HadoopIllegalArgumentException"); |
| } catch (Exception e) { |
| if (!(e instanceof HadoopIllegalArgumentException)) { |
| fail("Expecting HadoopIllegalArgumentException but caught " + e); |
| } |
| } |
| // Test mandatory field protocol |
| try { |
| new RPC.Builder(conf).setInstance(new TestImpl()).setBindAddress(ADDRESS) |
| .setPort(0).setNumHandlers(5).setVerbose(true).build(); |
| fail("Didn't throw HadoopIllegalArgumentException"); |
| } catch (Exception e) { |
| if (!(e instanceof HadoopIllegalArgumentException)) { |
| fail("Expecting HadoopIllegalArgumentException but caught " + e); |
| } |
| } |
| // Test mandatory field instance |
| try { |
| new RPC.Builder(conf).setProtocol(TestProtocol.class) |
| .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5) |
| .setVerbose(true).build(); |
| fail("Didn't throw HadoopIllegalArgumentException"); |
| } catch (Exception e) { |
| if (!(e instanceof HadoopIllegalArgumentException)) { |
| fail("Expecting HadoopIllegalArgumentException but caught " + e); |
| } |
| } |
| } |
| |
| @Test(timeout=90000) |
| public void testRPCInterruptedSimple() throws Exception { |
| Server server; |
| TestRpcService proxy = null; |
| |
| RPC.Builder builder = newServerBuilder(conf) |
| .setNumHandlers(5).setVerbose(true) |
| .setSecretManager(null); |
| |
| server = setupTestServer(builder); |
| |
| try { |
| proxy = getClient(addr, conf); |
| // Connect to the server |
| |
| proxy.ping(null, newEmptyRequest()); |
| // Interrupt self, try another call |
| Thread.currentThread().interrupt(); |
| try { |
| proxy.ping(null, newEmptyRequest()); |
| fail("Interruption did not cause IPC to fail"); |
| } catch (ServiceException se) { |
| if (se.toString().contains("InterruptedException") || |
| se.getCause() instanceof InterruptedIOException) { |
| // clear interrupt status for future tests |
| Thread.interrupted(); |
| return; |
| } |
| throw se; |
| } |
| } finally { |
| stop(server, proxy); |
| } |
| } |
| |
| @Test(timeout=30000) |
| public void testRPCInterrupted() throws Exception { |
| Server server; |
| |
| RPC.Builder builder = newServerBuilder(conf) |
| .setNumHandlers(5).setVerbose(true) |
| .setSecretManager(null); |
| server = setupTestServer(builder); |
| |
| int numConcurrentRPC = 200; |
| final CyclicBarrier barrier = new CyclicBarrier(numConcurrentRPC); |
| final CountDownLatch latch = new CountDownLatch(numConcurrentRPC); |
| final AtomicBoolean leaderRunning = new AtomicBoolean(true); |
| final AtomicReference<Throwable> error = new AtomicReference<>(); |
| Thread leaderThread = null; |
| |
| try { |
| for (int i = 0; i < numConcurrentRPC; i++) { |
| final int num = i; |
| final TestRpcService proxy = getClient(addr, conf); |
| Thread rpcThread = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| barrier.await(); |
| while (num == 0 || leaderRunning.get()) { |
| proxy.slowPing(null, newSlowPingRequest(false)); |
| } |
| |
| proxy.slowPing(null, newSlowPingRequest(false)); |
| } catch (Exception e) { |
| if (num == 0) { |
| leaderRunning.set(false); |
| } else { |
| error.set(e); |
| } |
| |
| LOG.error("thread " + num, e); |
| } finally { |
| latch.countDown(); |
| } |
| } |
| }); |
| rpcThread.start(); |
| |
| if (leaderThread == null) { |
| leaderThread = rpcThread; |
| } |
| } |
| // let threads get past the barrier |
| Thread.sleep(1000); |
| // stop a single thread |
| while (leaderRunning.get()) { |
| leaderThread.interrupt(); |
| } |
| |
| latch.await(); |
| |
| // should not cause any other thread to get an error |
| assertTrue("rpc got exception " + error.get(), error.get() == null); |
| } finally { |
| server.stop(); |
| } |
| // let threads get past the barrier |
| Thread.sleep(1000); |
| // stop a single thread |
| while (leaderRunning.get()) { |
| leaderThread.interrupt(); |
| } |
| |
| latch.await(); |
| |
| // should not cause any other thread to get an error |
| assertTrue("rpc got exception " + error.get(), error.get() == null); |
| server.stop(); |
| } |
| |
| @Test |
| public void testConnectionPing() throws Exception { |
| Server server; |
| TestRpcService proxy = null; |
| |
| int pingInterval = 50; |
| conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); |
| conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval); |
| server = setupTestServer(conf, 5); |
| |
| try { |
| proxy = getClient(addr, conf); |
| |
| proxy.sleep(null, newSleepRequest(pingInterval * 4)); |
| } finally { |
| stop(server, proxy); |
| } |
| } |
| |
| @Test(timeout=30000) |
| public void testExternalCall() throws Exception { |
| final UserGroupInformation ugi = UserGroupInformation |
| .createUserForTesting("user123", new String[0]); |
| final IOException expectedIOE = new IOException("boom"); |
| |
| // use 1 handler so the callq can be plugged |
| final Server server = setupTestServer(conf, 1); |
| try { |
| final AtomicBoolean result = new AtomicBoolean(); |
| |
| ExternalCall<String> remoteUserCall = newExtCall(ugi, |
| new PrivilegedExceptionAction<String>() { |
| @Override |
| public String run() throws Exception { |
| return UserGroupInformation.getCurrentUser().getUserName(); |
| } |
| }); |
| |
| ExternalCall<String> exceptionCall = newExtCall(ugi, |
| new PrivilegedExceptionAction<String>() { |
| @Override |
| public String run() throws Exception { |
| throw expectedIOE; |
| } |
| }); |
| |
| final CountDownLatch latch = new CountDownLatch(1); |
| final CyclicBarrier barrier = new CyclicBarrier(2); |
| |
| ExternalCall<Void> barrierCall = newExtCall(ugi, |
| new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| // notify we are in a handler and then wait to keep the callq |
| // plugged up |
| latch.countDown(); |
| barrier.await(); |
| return null; |
| } |
| }); |
| |
| server.queueCall(barrierCall); |
| server.queueCall(exceptionCall); |
| server.queueCall(remoteUserCall); |
| |
| // wait for barrier call to enter the handler, check that the other 2 |
| // calls are actually queued |
| latch.await(); |
| assertEquals(2, server.getCallQueueLen()); |
| |
| // unplug the callq |
| barrier.await(); |
| barrierCall.get(); |
| |
| // verify correct ugi is used |
| String answer = remoteUserCall.get(); |
| assertEquals(ugi.getUserName(), answer); |
| |
| try { |
| exceptionCall.get(); |
| fail("didn't throw"); |
| } catch (ExecutionException ee) { |
| assertTrue((ee.getCause()) instanceof IOException); |
| assertEquals(expectedIOE.getMessage(), ee.getCause().getMessage()); |
| } |
| } finally { |
| server.stop(); |
| } |
| } |
| |
| private <T> ExternalCall<T> newExtCall(final UserGroupInformation ugi, |
| PrivilegedExceptionAction<T> callable) { |
| return new ExternalCall<T>(callable) { |
| @Override |
| public String getProtocol() { |
| return "test"; |
| } |
| @Override |
| public UserGroupInformation getRemoteUser() { |
| return ugi; |
| } |
| }; |
| } |
| |
| @Test |
| public void testRpcMetrics() throws Exception { |
| final Server server; |
| TestRpcService proxy = null; |
| |
| final int interval = 1; |
| conf.setBoolean(CommonConfigurationKeys. |
| RPC_METRICS_QUANTILE_ENABLE, true); |
| conf.set(CommonConfigurationKeys. |
| RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval); |
| |
| server = setupTestServer(conf, 5); |
| String testUser = "testUser"; |
| UserGroupInformation anotherUser = |
| UserGroupInformation.createRemoteUser(testUser); |
| TestRpcService proxy2 = |
| anotherUser.doAs(new PrivilegedAction<TestRpcService>() { |
| public TestRpcService run() { |
| try { |
| return RPC.getProxy(TestRpcService.class, 0, |
| server.getListenerAddress(), conf); |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } |
| return null; |
| } |
| }); |
| try { |
| proxy = getClient(addr, conf); |
| |
| for (int i = 0; i < 1000; i++) { |
| proxy.ping(null, newEmptyRequest()); |
| |
| proxy.echo(null, newEchoRequest("" + i)); |
| proxy2.echo(null, newEchoRequest("" + i)); |
| } |
| MetricsRecordBuilder rpcMetrics = |
| getMetrics(server.getRpcMetrics().name()); |
| assertEquals("Expected correct rpc queue count", |
| 3000, getLongCounter("RpcQueueTimeNumOps", rpcMetrics)); |
| assertEquals("Expected correct rpc processing count", |
| 3000, getLongCounter("RpcProcessingTimeNumOps", rpcMetrics)); |
| assertEquals("Expected correct rpc lock wait count", |
| 3000, getLongCounter("RpcLockWaitTimeNumOps", rpcMetrics)); |
| assertEquals("Expected zero rpc lock wait time", |
| 0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001); |
| MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s", |
| rpcMetrics); |
| MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s", |
| rpcMetrics); |
| String actualUserVsCon = MetricsAsserts |
| .getStringMetric("NumOpenConnectionsPerUser", rpcMetrics); |
| String proxyUser = |
| UserGroupInformation.getCurrentUser().getShortUserName(); |
| assertTrue(actualUserVsCon.contains("\"" + proxyUser + "\":1")); |
| assertTrue(actualUserVsCon.contains("\"" + testUser + "\":1")); |
| |
| proxy.lockAndSleep(null, newSleepRequest(5)); |
| rpcMetrics = getMetrics(server.getRpcMetrics().name()); |
| assertGauge("RpcLockWaitTimeAvgTime", 10000.0, rpcMetrics); |
| } finally { |
| if (proxy2 != null) { |
| RPC.stopProxy(proxy2); |
| } |
| stop(server, proxy); |
| } |
| } |
| |
| /** |
| * Test RPC backoff by queue full. |
| */ |
| @Test (timeout=30000) |
| public void testClientBackOff() throws Exception { |
| Server server; |
| final TestRpcService proxy; |
| |
| boolean succeeded = false; |
| final int numClients = 2; |
| final List<Future<Void>> res = new ArrayList<Future<Void>>(); |
| final ExecutorService executorService = |
| Executors.newFixedThreadPool(numClients); |
| conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); |
| conf.setBoolean(CommonConfigurationKeys.IPC_NAMESPACE + |
| ".0." + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true); |
| RPC.Builder builder = newServerBuilder(conf) |
| .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true); |
| server = setupTestServer(builder); |
| |
| @SuppressWarnings("unchecked") |
| CallQueueManager<Call> spy = spy((CallQueueManager<Call>) Whitebox |
| .getInternalState(server, "callQueue")); |
| Whitebox.setInternalState(server, "callQueue", spy); |
| |
| Exception lastException = null; |
| proxy = getClient(addr, conf); |
| try { |
| // start a sleep RPC call to consume the only handler thread. |
| // Start another sleep RPC call to make callQueue full. |
| // Start another sleep RPC call to make reader thread block on CallQueue. |
| for (int i = 0; i < numClients; i++) { |
| res.add(executorService.submit( |
| new Callable<Void>() { |
| @Override |
| public Void call() throws ServiceException, InterruptedException { |
| proxy.sleep(null, newSleepRequest(100000)); |
| return null; |
| } |
| })); |
| verify(spy, timeout(500).times(i + 1)).addInternal( |
| Mockito.<Call>anyObject(), Mockito.eq(false)); |
| } |
| try { |
| proxy.sleep(null, newSleepRequest(100)); |
| } catch (ServiceException e) { |
| RemoteException re = (RemoteException) e.getCause(); |
| IOException unwrapExeption = re.unwrapRemoteException(); |
| if (unwrapExeption instanceof RetriableException) { |
| succeeded = true; |
| } else { |
| lastException = unwrapExeption; |
| } |
| } |
| } finally { |
| executorService.shutdown(); |
| stop(server, proxy); |
| } |
| if (lastException != null) { |
| LOG.error("Last received non-RetriableException:", lastException); |
| } |
| assertTrue("RetriableException not received", succeeded); |
| } |
| |
| /** |
| * Test RPC backoff by response time of each priority level. |
| */ |
| @Test (timeout=30000) |
| public void testClientBackOffByResponseTime() throws Exception { |
| final TestRpcService proxy; |
| boolean succeeded = false; |
| final int numClients = 1; |
| |
| GenericTestUtils.setLogLevel(DecayRpcScheduler.LOG, Level.DEBUG); |
| GenericTestUtils.setLogLevel(RPC.LOG, Level.DEBUG); |
| |
| final List<Future<Void>> res = new ArrayList<Future<Void>>(); |
| final ExecutorService executorService = |
| Executors.newFixedThreadPool(numClients); |
| conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); |
| |
| final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0"; |
| Server server = setupDecayRpcSchedulerandTestServer(ns + "."); |
| |
| @SuppressWarnings("unchecked") |
| CallQueueManager<Call> spy = spy((CallQueueManager<Call>) Whitebox |
| .getInternalState(server, "callQueue")); |
| Whitebox.setInternalState(server, "callQueue", spy); |
| |
| Exception lastException = null; |
| proxy = getClient(addr, conf); |
| |
| try { |
| // start a sleep RPC call that sleeps 3s. |
| for (int i = 0; i < numClients; i++) { |
| res.add(executorService.submit( |
| new Callable<Void>() { |
| @Override |
| public Void call() throws ServiceException, InterruptedException { |
| proxy.sleep(null, newSleepRequest(3000)); |
| return null; |
| } |
| })); |
| verify(spy, timeout(500).times(i + 1)).addInternal( |
| Mockito.<Call>anyObject(), Mockito.eq(false)); |
| } |
| // Start another sleep RPC call and verify the call is backed off due to |
| // avg response time(3s) exceeds threshold (2s). |
| try { |
| // wait for the 1st response time update |
| Thread.sleep(5500); |
| proxy.sleep(null, newSleepRequest(100)); |
| } catch (ServiceException e) { |
| RemoteException re = (RemoteException) e.getCause(); |
| IOException unwrapExeption = re.unwrapRemoteException(); |
| if (unwrapExeption instanceof RetriableException) { |
| succeeded = true; |
| } else { |
| lastException = unwrapExeption; |
| } |
| } |
| } finally { |
| executorService.shutdown(); |
| stop(server, proxy); |
| } |
| if (lastException != null) { |
| LOG.error("Last received non-RetriableException:", lastException); |
| } |
| assertTrue("RetriableException not received", succeeded); |
| } |
| |
| /** Test that the metrics for DecayRpcScheduler are updated. */ |
| @Test (timeout=30000) |
| public void testDecayRpcSchedulerMetrics() throws Exception { |
| final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0"; |
| Server server = setupDecayRpcSchedulerandTestServer(ns + "."); |
| |
| MetricsRecordBuilder rb1 = |
| getMetrics("DecayRpcSchedulerMetrics2." + ns); |
| final long beginDecayedCallVolume = MetricsAsserts.getLongCounter( |
| "DecayedCallVolume", rb1); |
| final long beginRawCallVolume = MetricsAsserts.getLongCounter( |
| "CallVolume", rb1); |
| final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers", |
| rb1); |
| |
| TestRpcService proxy = getClient(addr, conf); |
| try { |
| for (int i = 0; i < 2; i++) { |
| proxy.sleep(null, newSleepRequest(100)); |
| } |
| |
| // Lets Metric system update latest metrics |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| MetricsRecordBuilder rb2 = |
| getMetrics("DecayRpcSchedulerMetrics2." + ns); |
| long decayedCallVolume1 = MetricsAsserts.getLongCounter( |
| "DecayedCallVolume", rb2); |
| long rawCallVolume1 = MetricsAsserts.getLongCounter( |
| "CallVolume", rb2); |
| int uniqueCaller1 = MetricsAsserts.getIntCounter( |
| "UniqueCallers", rb2); |
| long callVolumePriority0 = MetricsAsserts.getLongGauge( |
| "Priority.0.CompletedCallVolume", rb2); |
| long callVolumePriority1 = MetricsAsserts.getLongGauge( |
| "Priority.1.CompletedCallVolume", rb2); |
| double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge( |
| "Priority.0.AvgResponseTime", rb2); |
| double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge( |
| "Priority.1.AvgResponseTime", rb2); |
| |
| LOG.info("DecayedCallVolume: {}", decayedCallVolume1); |
| LOG.info("CallVolume: {}", rawCallVolume1); |
| LOG.info("UniqueCaller: {}", uniqueCaller1); |
| LOG.info("Priority.0.CompletedCallVolume: {}", callVolumePriority0); |
| LOG.info("Priority.1.CompletedCallVolume: {}", callVolumePriority1); |
| LOG.info("Priority.0.AvgResponseTime: {}", avgRespTimePriority0); |
| LOG.info("Priority.1.AvgResponseTime: {}", avgRespTimePriority1); |
| |
| return decayedCallVolume1 > beginDecayedCallVolume && |
| rawCallVolume1 > beginRawCallVolume && |
| uniqueCaller1 > beginUniqueCaller; |
| } |
| }, 30, 60000); |
| } finally { |
| stop(server, proxy); |
| } |
| } |
| |
| private Server setupDecayRpcSchedulerandTestServer(String ns) |
| throws Exception { |
| final int queueSizePerHandler = 3; |
| |
| conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); |
| conf.setBoolean(ns + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true); |
| conf.setStrings(ns + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, |
| "org.apache.hadoop.ipc.FairCallQueue"); |
| conf.setStrings(ns + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY, |
| "org.apache.hadoop.ipc.DecayRpcScheduler"); |
| conf.setInt(ns + CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY, |
| 2); |
| conf.setBoolean(ns + |
| DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY, |
| true); |
| // set a small thresholds 2s and 4s for level 0 and level 1 for testing |
| conf.set(ns + |
| DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY |
| , "2s, 4s"); |
| |
| // Set max queue size to 3 so that 2 calls from the test won't trigger |
| // back off because the queue is full. |
| RPC.Builder builder = newServerBuilder(conf) |
| .setQueueSizePerHandler(queueSizePerHandler).setNumHandlers(1) |
| .setVerbose(true); |
| return setupTestServer(builder); |
| } |
| |
| /** |
| * Test RPC timeout. |
| */ |
| @Test(timeout=30000) |
| public void testClientRpcTimeout() throws Exception { |
| Server server; |
| TestRpcService proxy = null; |
| |
| RPC.Builder builder = newServerBuilder(conf) |
| .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true); |
| server = setupTestServer(builder); |
| |
| try { |
| // Test RPC timeout with default ipc.client.ping. |
| try { |
| Configuration c = new Configuration(conf); |
| c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000); |
| proxy = getClient(addr, c); |
| proxy.sleep(null, newSleepRequest(3000)); |
| fail("RPC should time out."); |
| } catch (ServiceException e) { |
| assertTrue(e.getCause() instanceof SocketTimeoutException); |
| LOG.info("got expected timeout.", e); |
| } |
| |
| // Test RPC timeout when ipc.client.ping is false. |
| try { |
| Configuration c = new Configuration(conf); |
| c.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false); |
| c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000); |
| proxy = getClient(addr, c); |
| proxy.sleep(null, newSleepRequest(3000)); |
| fail("RPC should time out."); |
| } catch (ServiceException e) { |
| assertTrue(e.getCause() instanceof SocketTimeoutException); |
| LOG.info("got expected timeout.", e); |
| } |
| |
| // Test negative timeout value. |
| try { |
| Configuration c = new Configuration(conf); |
| c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, -1); |
| proxy = getClient(addr, c); |
| proxy.sleep(null, newSleepRequest(2000)); |
| } catch (ServiceException e) { |
| LOG.info("got unexpected exception.", e); |
| fail("RPC should not time out."); |
| } |
| |
| // Test RPC timeout greater than ipc.ping.interval. |
| try { |
| Configuration c = new Configuration(conf); |
| c.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); |
| c.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 800); |
| c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000); |
| proxy = getClient(addr, c); |
| |
| try { |
| // should not time out because effective rpc-timeout is |
| // multiple of ping interval: 1600 (= 800 * (1000 / 800 + 1)) |
| proxy.sleep(null, newSleepRequest(1300)); |
| } catch (ServiceException e) { |
| LOG.info("got unexpected exception.", e); |
| fail("RPC should not time out."); |
| } |
| |
| proxy.sleep(null, newSleepRequest(2000)); |
| fail("RPC should time out."); |
| } catch (ServiceException e) { |
| assertTrue(e.getCause() instanceof SocketTimeoutException); |
| LOG.info("got expected timeout.", e); |
| } |
| |
| } finally { |
| stop(server, proxy); |
| } |
| } |
| |
| public static class FakeRequestClass extends RpcWritable { |
| static volatile IOException exception; |
| @Override |
| void writeTo(ResponseBuffer out) throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| @Override |
| <T> T readFrom(ByteBuffer bb) throws IOException { |
| throw exception; |
| } |
| } |
| |
| @SuppressWarnings("serial") |
| public static class TestReaderException extends IOException { |
| public TestReaderException(String msg) { |
| super(msg); |
| } |
| @Override |
| public boolean equals(Object t) { |
| return (t.getClass() == TestReaderException.class) && |
| getMessage().equals(((TestReaderException)t).getMessage()); |
| } |
| } |
| |
| @Test (timeout=30000) |
| public void testReaderExceptions() throws Exception { |
| Server server = null; |
| TestRpcService proxy = null; |
| |
| // will attempt to return this exception from a reader with and w/o |
| // the connection closing. |
| IOException expectedIOE = new TestReaderException("testing123"); |
| |
| @SuppressWarnings("serial") |
| IOException rseError = new RpcServerException("keepalive", expectedIOE){ |
| @Override |
| public RpcStatusProto getRpcStatusProto() { |
| return RpcStatusProto.ERROR; |
| } |
| }; |
| @SuppressWarnings("serial") |
| IOException rseFatal = new RpcServerException("disconnect", expectedIOE) { |
| @Override |
| public RpcStatusProto getRpcStatusProto() { |
| return RpcStatusProto.FATAL; |
| } |
| }; |
| |
| try { |
| RPC.Builder builder = newServerBuilder(conf) |
| .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true); |
| server = setupTestServer(builder); |
| Whitebox.setInternalState( |
| server, "rpcRequestClass", FakeRequestClass.class); |
| MutableCounterLong authMetric = |
| (MutableCounterLong)Whitebox.getInternalState( |
| server.getRpcMetrics(), "rpcAuthorizationSuccesses"); |
| |
| proxy = getClient(addr, conf); |
| boolean isDisconnected = true; |
| Connection lastConn = null; |
| long expectedAuths = 0; |
| |
| // fuzz the client. |
| for (int i=0; i < 128; i++) { |
| String reqName = "request[" + i + "]"; |
| int r = ThreadLocalRandom.current().nextInt(); |
| final boolean doDisconnect = r % 4 == 0; |
| LOG.info("TestDisconnect request[" + i + "] " + |
| " shouldConnect=" + isDisconnected + |
| " willDisconnect=" + doDisconnect); |
| if (isDisconnected) { |
| expectedAuths++; |
| } |
| try { |
| FakeRequestClass.exception = doDisconnect ? rseFatal : rseError; |
| proxy.ping(null, newEmptyRequest()); |
| fail(reqName + " didn't fail"); |
| } catch (ServiceException e) { |
| RemoteException re = (RemoteException)e.getCause(); |
| assertEquals(reqName, expectedIOE, re.unwrapRemoteException()); |
| } |
| // check authorizations to ensure new connection when expected, |
| // then conclusively determine if connections are disconnected |
| // correctly. |
| assertEquals(reqName, expectedAuths, authMetric.value()); |
| if (!doDisconnect) { |
| // if it wasn't fatal, verify there's only one open connection. |
| Connection[] conns = server.getConnections(); |
| assertEquals(reqName, 1, conns.length); |
| // verify whether the connection should have been reused. |
| if (isDisconnected) { |
| assertNotSame(reqName, lastConn, conns[0]); |
| } else { |
| assertSame(reqName, lastConn, conns[0]); |
| } |
| lastConn = conns[0]; |
| } else if (lastConn != null) { |
| // avoid race condition in server where connection may not be |
| // fully removed yet. just make sure it's marked for being closed. |
| // the open connection checks above ensure correct behavior. |
| assertTrue(reqName, lastConn.shouldClose()); |
| } |
| isDisconnected = doDisconnect; |
| } |
| } finally { |
| stop(server, proxy); |
| } |
| } |
| |
| public static void main(String[] args) throws Exception { |
| new TestRPC().testCallsInternal(conf); |
| } |
| } |