/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.ipc;

import com.google.common.base.Supplier;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.test.MockitoUtil;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import javax.net.SocketFactory;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

/** Unit tests for RPC. */
@SuppressWarnings("deprecation")
public class TestRPC extends TestRpcBase {

  public static final Logger LOG = LoggerFactory.getLogger(TestRPC.class);

  @Before
  public void setup() {
    setupConf();
  }

  int datasize = 1024*100;
  int numThreads = 50;

  public interface TestProtocol extends VersionedProtocol {
    long versionID = 1L;

    void ping() throws IOException;
    void sleep(long delay) throws IOException, InterruptedException;
    String echo(String value) throws IOException;
    String[] echo(String[] value) throws IOException;
    Writable echo(Writable value) throws IOException;
    int add(int v1, int v2) throws IOException;
    int add(int[] values) throws IOException;
    int error() throws IOException;
  }

  public static class TestImpl implements TestProtocol {
    int fastPingCounter = 0;
    
    @Override
    public long getProtocolVersion(String protocol, long clientVersion) {
      return TestProtocol.versionID;
    }

    @Override
    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
                                                  int hashcode) {
      return new ProtocolSignature(TestProtocol.versionID, null);
    }

    @Override
    public void ping() {}
    
    @Override
    public void sleep(long delay) throws InterruptedException {
      Thread.sleep(delay);
    }

    @Override
    public String echo(String value) throws IOException { return value; }

    @Override
    public String[] echo(String[] values) throws IOException { return values; }

    @Override
    public Writable echo(Writable writable) {
      return writable;
    }
    @Override
    public int add(int v1, int v2) {
      return v1 + v2;
    }

    @Override
    public int add(int[] values) {
      int sum = 0;
      for (int i = 0; i < values.length; i++) {
        sum += values[i];
      }
      return sum;
    }

    @Override
    public int error() throws IOException {
      throw new IOException("bobo");
    }
  }

  //
  // an object that does a bunch of transactions
  //
  static class Transactions implements Runnable {
    int datasize;
    TestRpcService proxy;

    Transactions(TestRpcService proxy, int datasize) {
      this.proxy = proxy;
      this.datasize = datasize;
    }

    // do two RPC that transfers data.
    @Override
    public void run() {
      Integer[] indata = new Integer[datasize];
      Arrays.fill(indata, 123);
      TestProtos.ExchangeRequestProto exchangeRequest =
          TestProtos.ExchangeRequestProto.newBuilder().addAllValues(
              Arrays.asList(indata)).build();
      Integer[] outdata = null;
      TestProtos.ExchangeResponseProto exchangeResponse;

      TestProtos.AddRequestProto addRequest =
          TestProtos.AddRequestProto.newBuilder().setParam1(1)
              .setParam2(2).build();
      TestProtos.AddResponseProto addResponse;

      int val = 0;
      try {
        exchangeResponse = proxy.exchange(null, exchangeRequest);
        outdata = new Integer[exchangeResponse.getValuesCount()];
        outdata = exchangeResponse.getValuesList().toArray(outdata);
        addResponse = proxy.add(null, addRequest);
        val = addResponse.getResult();
      } catch (ServiceException e) {
        assertTrue("Exception from RPC exchange() "  + e, false);
      }
      assertEquals(indata.length, outdata.length);
      assertEquals(3, val);
      for (int i = 0; i < outdata.length; i++) {
        assertEquals(outdata[i].intValue(), i);
      }
    }
  }

  //
  // A class that does an RPC but does not read its response.
  //
  static class SlowRPC implements Runnable {
    private TestRpcService proxy;
    private volatile boolean done;

    SlowRPC(TestRpcService proxy) {
      this.proxy = proxy;
      done = false;
    }

    boolean isDone() {
      return done;
    }

    @Override
    public void run() {
      try {
        // this would hang until two fast pings happened
        ping(true);
        done = true;
      } catch (ServiceException e) {
        assertTrue("SlowRPC ping exception " + e, false);
      }
    }

    void ping(boolean shouldSlow) throws ServiceException {
      // this would hang until two fast pings happened
      proxy.slowPing(null, newSlowPingRequest(shouldSlow));
    }
  }

  /**
   * A basic interface for testing client-side RPC resource cleanup.
   */
  private interface StoppedProtocol {
    long versionID = 0;

    void stop();
  }

  /**
   * A class used for testing cleanup of client side RPC resources.
   */
  private static class StoppedRpcEngine implements RpcEngine {

    @Override
    public <T> ProtocolProxy<T> getProxy(
        Class<T> protocol, long clientVersion, InetSocketAddress addr,
        UserGroupInformation ticket, Configuration conf,
        SocketFactory factory, int rpcTimeout,
        RetryPolicy connectionRetryPolicy) throws IOException {
      return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
          rpcTimeout, connectionRetryPolicy, null, null);
    }

    @SuppressWarnings("unchecked")
    @Override
    public <T> ProtocolProxy<T> getProxy(
        Class<T> protocol, long clientVersion, InetSocketAddress addr,
        UserGroupInformation ticket, Configuration conf, SocketFactory factory,
        int rpcTimeout, RetryPolicy connectionRetryPolicy,
        AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
        throws IOException {
      T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
          new Class[] { protocol }, new StoppedInvocationHandler());
      return new ProtocolProxy<T>(protocol, proxy, false);
    }

    @Override
    public org.apache.hadoop.ipc.RPC.Server getServer(
        Class<?> protocol, Object instance, String bindAddress, int port,
        int numHandlers, int numReaders, int queueSizePerHandler,
        boolean verbose, Configuration conf,
        SecretManager<? extends TokenIdentifier> secretManager,
        String portRangeConfig, AlignmentContext alignmentContext)
        throws IOException {
      return null;
    }

    @Override
    public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
        ConnectionId connId, Configuration conf, SocketFactory factory)
        throws IOException {
      throw new UnsupportedOperationException("This proxy is not supported");
    }
  }

  /**
   * An invocation handler which does nothing when invoking methods, and just
   * counts the number of times close() is called.
   */
  private static class StoppedInvocationHandler
      implements InvocationHandler, Closeable {

    private int closeCalled = 0;

    @Override
    public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable {
      return null;
    }

    @Override
    public void close() throws IOException {
      closeCalled++;
    }

    public int getCloseCalled() {
      return closeCalled;
    }

  }

  @Test
  public void testConfRpc() throws IOException {
    Server server = newServerBuilder(conf)
        .setNumHandlers(1).setVerbose(false).build();

    // Just one handler
    int confQ = conf.getInt(
        CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
        CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
    assertEquals(confQ, server.getMaxQueueSize());

    int confReaders = conf.getInt(
        CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
        CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
    assertEquals(confReaders, server.getNumReaders());

    server = newServerBuilder(conf)
        .setNumHandlers(1).setnumReaders(3).setQueueSizePerHandler(200)
        .setVerbose(false).build();

    assertEquals(3, server.getNumReaders());
    assertEquals(200, server.getMaxQueueSize());

    server = newServerBuilder(conf).setQueueSizePerHandler(10)
        .setNumHandlers(2).setVerbose(false).build();
    assertEquals(2 * 10, server.getMaxQueueSize());
  }

  @Test
  public void testProxyAddress() throws Exception {
    Server server = null;
    TestRpcService proxy = null;

    try {
      server = setupTestServer(conf, -1);
      // create a client
      proxy = getClient(addr, conf);
      assertEquals(addr, RPC.getServerAddress(proxy));
    } finally {
      stop(server, proxy);
    }
  }

  @Test
  public void testSlowRpc() throws IOException, ServiceException {
    Server server;
    TestRpcService proxy = null;

    System.out.println("Testing Slow RPC");
    // create a server with two handlers
    server = setupTestServer(conf, 2);

    try {
      // create a client
      proxy = getClient(addr, conf);

      SlowRPC slowrpc = new SlowRPC(proxy);
      Thread thread = new Thread(slowrpc, "SlowRPC");
      thread.start(); // send a slow RPC, which won't return until two fast pings
      assertTrue("Slow RPC should not have finished1.", !slowrpc.isDone());

      slowrpc.ping(false); // first fast ping

      // verify that the first RPC is still stuck
      assertTrue("Slow RPC should not have finished2.", !slowrpc.isDone());

      slowrpc.ping(false); // second fast ping

      // Now the slow ping should be able to be executed
      while (!slowrpc.isDone()) {
        System.out.println("Waiting for slow RPC to get done.");
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {}
      }
    } finally {
      System.out.println("Down slow rpc testing");
      stop(server, proxy);
    }
  }

  @Test
  public void testCalls() throws Exception {
    testCallsInternal(conf);
  }

  private void testCallsInternal(Configuration myConf) throws Exception {
    Server server;
    TestRpcService proxy = null;

    server = setupTestServer(myConf, -1);
    try {
      proxy = getClient(addr, myConf);

      proxy.ping(null, newEmptyRequest());

      TestProtos.EchoResponseProto echoResp = proxy.echo(null,
          newEchoRequest("foo"));
      assertEquals(echoResp.getMessage(), "foo");

      echoResp = proxy.echo(null, newEchoRequest(""));
      assertEquals(echoResp.getMessage(), "");

      // Check rpcMetrics
      MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name());
      assertCounter("RpcProcessingTimeNumOps", 3L, rb);
      assertCounterGt("SentBytes", 0L, rb);
      assertCounterGt("ReceivedBytes", 0L, rb);

      // Number of calls to echo method should be 2
      rb = getMetrics(server.rpcDetailedMetrics.name());
      assertCounter("EchoNumOps", 2L, rb);

      // Number of calls to ping method should be 1
      assertCounter("PingNumOps", 1L, rb);

      String[] strings = new String[] {"foo","bar"};
      TestProtos.EchoRequestProto2 echoRequest2 =
          TestProtos.EchoRequestProto2.newBuilder().addAllMessage(
              Arrays.asList(strings)).build();
      TestProtos.EchoResponseProto2 echoResponse2 =
          proxy.echo2(null, echoRequest2);
      assertTrue(Arrays.equals(echoResponse2.getMessageList().toArray(),
          strings));

      echoRequest2 = TestProtos.EchoRequestProto2.newBuilder()
          .addAllMessage(Collections.<String>emptyList()).build();
      echoResponse2 = proxy.echo2(null, echoRequest2);
      assertTrue(Arrays.equals(echoResponse2.getMessageList().toArray(),
          new String[]{}));

      TestProtos.AddRequestProto addRequest =
          TestProtos.AddRequestProto.newBuilder().setParam1(1)
              .setParam2(2).build();
      TestProtos.AddResponseProto addResponse =
          proxy.add(null, addRequest);
      assertEquals(addResponse.getResult(), 3);

      Integer[] integers = new Integer[] {1, 2};
      TestProtos.AddRequestProto2 addRequest2 =
          TestProtos.AddRequestProto2.newBuilder().addAllParams(
              Arrays.asList(integers)).build();
      addResponse = proxy.add2(null, addRequest2);
      assertEquals(addResponse.getResult(), 3);

      boolean caught = false;
      try {
        proxy.error(null, newEmptyRequest());
      } catch (ServiceException e) {
        if(LOG.isDebugEnabled()) {
          LOG.debug("Caught " + e);
        }
        caught = true;
      }
      assertTrue(caught);
      rb = getMetrics(server.rpcDetailedMetrics.name());
      assertCounter("RpcServerExceptionNumOps", 1L, rb);

      //proxy.testServerGet();

      // create multiple threads and make them do large data transfers
      System.out.println("Starting multi-threaded RPC test...");
      server.setSocketSendBufSize(1024);
      Thread threadId[] = new Thread[numThreads];
      for (int i = 0; i < numThreads; i++) {
        Transactions trans = new Transactions(proxy, datasize);
        threadId[i] = new Thread(trans, "TransactionThread-" + i);
        threadId[i].start();
      }

      // wait for all transactions to get over
      System.out.println("Waiting for all threads to finish RPCs...");
      for (int i = 0; i < numThreads; i++) {
        try {
          threadId[i].join();
        } catch (InterruptedException e) {
          i--;      // retry
        }
      }
    } finally {
      stop(server, proxy);
    }
  }

  @Test
  public void testClientWithoutServer() throws Exception {
    TestRpcService proxy;

    short invalidPort = 20;
    InetSocketAddress invalidAddress = new InetSocketAddress(ADDRESS,
        invalidPort);
    long invalidClientVersion = 1L;
    try {
      proxy = RPC.getProxy(TestRpcService.class,
          invalidClientVersion, invalidAddress, conf);
      // Test echo method
      proxy.echo(null, newEchoRequest("hello"));
      fail("We should not have reached here");
    } catch (ServiceException ioe) {
      //this is what we expected
      if (!(ioe.getCause() instanceof ConnectException)) {
        fail("We should not have reached here");
      }
    }
  }

  private static final String ACL_CONFIG = "test.protocol.acl";

  private static class TestPolicyProvider extends PolicyProvider {

    @Override
    public Service[] getServices() {
      return new Service[] { new Service(ACL_CONFIG, TestRpcService.class) };
    }
  }

  private void doRPCs(Configuration myConf, boolean expectFailure) throws Exception {
    Server server;
    TestRpcService proxy = null;

    server = setupTestServer(myConf, 5);

    server.refreshServiceAcl(myConf, new TestPolicyProvider());

    TestProtos.EmptyRequestProto emptyRequestProto =
        TestProtos.EmptyRequestProto.newBuilder().build();

    try {
      proxy = getClient(addr, conf);
      proxy.ping(null, emptyRequestProto);
      if (expectFailure) {
        fail("Expect RPC.getProxy to fail with AuthorizationException!");
      }
    } catch (ServiceException e) {
      if (expectFailure) {
        RemoteException re = (RemoteException) e.getCause();
        assertTrue(re.unwrapRemoteException() instanceof AuthorizationException);
        assertEquals("RPC error code should be UNAUTHORIZED",
            RpcErrorCodeProto.FATAL_UNAUTHORIZED, re.getErrorCode());
      } else {
        throw e;
      }
    } finally {
      MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name());
      if (expectFailure) {
        assertCounter("RpcAuthorizationFailures", 1L, rb);
      } else {
        assertCounter("RpcAuthorizationSuccesses", 1L, rb);
      }
      //since we don't have authentication turned ON, we should see 
      // 0 for the authentication successes and 0 for failure
      assertCounter("RpcAuthenticationFailures", 0L, rb);
      assertCounter("RpcAuthenticationSuccesses", 0L, rb);

      stop(server, proxy);
    }
  }

  @Test
  public void testServerAddress() throws IOException {
    Server server;

    server = setupTestServer(conf, 5);
    try {
      InetSocketAddress bindAddr = NetUtils.getConnectAddress(server);
      assertEquals(InetAddress.getLocalHost(), bindAddr.getAddress());
    } finally {
      stop(server, null);
    }
  }

  @Test
  public void testAuthorization() throws Exception {
    Configuration myConf = new Configuration();
    myConf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
        true);

    // Expect to succeed
    myConf.set(ACL_CONFIG, "*");
    doRPCs(myConf, false);

    // Reset authorization to expect failure
    myConf.set(ACL_CONFIG, "invalid invalid");
    doRPCs(myConf, true);

    myConf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
    // Expect to succeed
    myConf.set(ACL_CONFIG, "*");
    doRPCs(myConf, false);

    // Reset authorization to expect failure
    myConf.set(ACL_CONFIG, "invalid invalid");
    doRPCs(myConf, true);
  }

  /**
   * Switch off setting socketTimeout values on RPC sockets.
   * Verify that RPC calls still work ok.
   */
  public void testNoPings() throws Exception {
    Configuration conf = new Configuration();

    conf.setBoolean("ipc.client.ping", false);
    new TestRPC().testCallsInternal(conf);

    conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
    new TestRPC().testCallsInternal(conf);
  }

  /**
   * Test stopping a non-registered proxy
   * @throws IOException
   */
  @Test(expected=HadoopIllegalArgumentException.class)
  public void testStopNonRegisteredProxy() throws IOException {
    RPC.stopProxy(null);
  }

  /**
   * Test that the mockProtocol helper returns mock proxies that can
   * be stopped without error.
   */
  @Test
  public void testStopMockObject() throws IOException {
    RPC.stopProxy(MockitoUtil.mockProtocol(TestProtocol.class));
  }

  @Test
  public void testStopProxy() throws IOException {
    RPC.setProtocolEngine(conf,
        StoppedProtocol.class, StoppedRpcEngine.class);

    StoppedProtocol proxy = RPC.getProxy(StoppedProtocol.class,
        StoppedProtocol.versionID, null, conf);
    StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
        Proxy.getInvocationHandler(proxy);
    assertEquals(0, invocationHandler.getCloseCalled());
    RPC.stopProxy(proxy);
    assertEquals(1, invocationHandler.getCloseCalled());
  }

  @Test
  public void testWrappedStopProxy() throws IOException {
    StoppedProtocol wrappedProxy = RPC.getProxy(StoppedProtocol.class,
        StoppedProtocol.versionID, null, conf);
    StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
        Proxy.getInvocationHandler(wrappedProxy);

    StoppedProtocol proxy = (StoppedProtocol) RetryProxy.create(
        StoppedProtocol.class, wrappedProxy, RetryPolicies.RETRY_FOREVER);

    assertEquals(0, invocationHandler.getCloseCalled());
    RPC.stopProxy(proxy);
    assertEquals(1, invocationHandler.getCloseCalled());
  }

  @Test
  public void testErrorMsgForInsecureClient() throws IOException {
    Server server;
    TestRpcService proxy = null;

    Configuration serverConf = new Configuration(conf);
    SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS,
        serverConf);
    UserGroupInformation.setConfiguration(serverConf);

    server = setupTestServer(serverConf, 5);

    boolean succeeded = false;

    try {
      UserGroupInformation.setConfiguration(conf);
      proxy = getClient(addr, conf);
      proxy.echo(null, newEchoRequest(""));
    } catch (ServiceException e) {
      assertTrue(e.getCause() instanceof RemoteException);
      RemoteException re = (RemoteException) e.getCause();
      LOG.info("LOGGING MESSAGE: " + re.getLocalizedMessage());
      assertEquals("RPC error code should be UNAUTHORIZED",
          RpcErrorCodeProto.FATAL_UNAUTHORIZED, re.getErrorCode());
      assertTrue(re.unwrapRemoteException() instanceof AccessControlException);
      succeeded = true;
    } finally {
      stop(server, proxy);
    }
    assertTrue(succeeded);

    conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);

    UserGroupInformation.setConfiguration(serverConf);
    server = setupTestServer(serverConf, 5);
    succeeded = false;
    proxy = null;
    try {
      UserGroupInformation.setConfiguration(conf);
      proxy = getClient(addr, conf);
      proxy.echo(null, newEchoRequest(""));
    } catch (ServiceException e) {
      RemoteException re = (RemoteException) e.getCause();
      LOG.info("LOGGING MESSAGE: " + re.getLocalizedMessage());
      assertEquals("RPC error code should be UNAUTHORIZED",
          RpcErrorCodeProto.FATAL_UNAUTHORIZED, re.getErrorCode());
      assertTrue(re.unwrapRemoteException() instanceof AccessControlException);
      succeeded = true;
    } finally {
      stop(server, proxy);
    }
    assertTrue(succeeded);
  }

  /**
   * Test that server.stop() properly stops all threads
   */
  @Test
  public void testStopsAllThreads() throws IOException, InterruptedException {
    Server server;

    int threadsBefore = countThreads("Server$Listener$Reader");
    assertEquals("Expect no Reader threads running before test",
        0, threadsBefore);

    server = setupTestServer(conf, 5);

    try {
      // Wait for at least one reader thread to start
      int threadsRunning = 0;
      long totalSleepTime = 0;
      do {
        totalSleepTime += 10;
        Thread.sleep(10);
        threadsRunning = countThreads("Server$Listener$Reader");
      } while (threadsRunning == 0 && totalSleepTime < 5000);

      // Validate that at least one thread started (we didn't timeout)
      threadsRunning = countThreads("Server$Listener$Reader");
      assertTrue(threadsRunning > 0);
    } finally {
      server.stop();
    }

    int threadsAfter = countThreads("Server$Listener$Reader");
    assertEquals("Expect no Reader threads left running after test",
        0, threadsAfter);
  }

  @Test
  public void testRPCBuilder() throws IOException {
    // Test mandatory field conf
    try {
      new RPC.Builder(null).setProtocol(TestProtocol.class)
          .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
          .setNumHandlers(5).setVerbose(true).build();
      fail("Didn't throw HadoopIllegalArgumentException");
    } catch (Exception e) {
      if (!(e instanceof HadoopIllegalArgumentException)) {
        fail("Expecting HadoopIllegalArgumentException but caught " + e);
      }
    }
    // Test mandatory field protocol
    try {
      new RPC.Builder(conf).setInstance(new TestImpl()).setBindAddress(ADDRESS)
          .setPort(0).setNumHandlers(5).setVerbose(true).build();
      fail("Didn't throw HadoopIllegalArgumentException");
    } catch (Exception e) {
      if (!(e instanceof HadoopIllegalArgumentException)) {
        fail("Expecting HadoopIllegalArgumentException but caught " + e);
      }
    }
    // Test mandatory field instance
    try {
      new RPC.Builder(conf).setProtocol(TestProtocol.class)
          .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5)
          .setVerbose(true).build();
      fail("Didn't throw HadoopIllegalArgumentException");
    } catch (Exception e) {
      if (!(e instanceof HadoopIllegalArgumentException)) {
        fail("Expecting HadoopIllegalArgumentException but caught " + e);
      }
    }
  }

  @Test(timeout=90000)
  public void testRPCInterruptedSimple() throws Exception {
    Server server;
    TestRpcService proxy = null;

    RPC.Builder builder = newServerBuilder(conf)
        .setNumHandlers(5).setVerbose(true)
        .setSecretManager(null);

    server = setupTestServer(builder);

    try {
      proxy = getClient(addr, conf);
      // Connect to the server

      proxy.ping(null, newEmptyRequest());
      // Interrupt self, try another call
      Thread.currentThread().interrupt();
      try {
        proxy.ping(null, newEmptyRequest());
        fail("Interruption did not cause IPC to fail");
      } catch (ServiceException se) {
        if (se.toString().contains("InterruptedException") ||
            se.getCause() instanceof InterruptedIOException) {
          // clear interrupt status for future tests
          Thread.interrupted();
          return;
        }
        throw se;
      }
    } finally {
      stop(server, proxy);
    }
  }

  @Test(timeout=30000)
  public void testRPCInterrupted() throws Exception {
    Server server;

    RPC.Builder builder = newServerBuilder(conf)
        .setNumHandlers(5).setVerbose(true)
        .setSecretManager(null);
    server = setupTestServer(builder);

    int numConcurrentRPC = 200;
    final CyclicBarrier barrier = new CyclicBarrier(numConcurrentRPC);
    final CountDownLatch latch = new CountDownLatch(numConcurrentRPC);
    final AtomicBoolean leaderRunning = new AtomicBoolean(true);
    final AtomicReference<Throwable> error = new AtomicReference<>();
    Thread leaderThread = null;

    try {
      for (int i = 0; i < numConcurrentRPC; i++) {
        final int num = i;
        final TestRpcService proxy = getClient(addr, conf);
        Thread rpcThread = new Thread(new Runnable() {
          @Override
          public void run() {
            try {
              barrier.await();
              while (num == 0 || leaderRunning.get()) {
                proxy.slowPing(null, newSlowPingRequest(false));
              }

              proxy.slowPing(null, newSlowPingRequest(false));
            } catch (Exception e) {
              if (num == 0) {
                leaderRunning.set(false);
              } else {
                error.set(e);
              }

              LOG.error("thread " + num, e);
            } finally {
              latch.countDown();
            }
          }
        });
        rpcThread.start();

        if (leaderThread == null) {
          leaderThread = rpcThread;
        }
      }
      // let threads get past the barrier
      Thread.sleep(1000);
      // stop a single thread
      while (leaderRunning.get()) {
        leaderThread.interrupt();
      }

      latch.await();

      // should not cause any other thread to get an error
      assertTrue("rpc got exception " + error.get(), error.get() == null);
    } finally {
      server.stop();
    }
    // let threads get past the barrier
    Thread.sleep(1000);
    // stop a single thread
    while (leaderRunning.get()) {
      leaderThread.interrupt();
    }
    
    latch.await();
    
    // should not cause any other thread to get an error
    assertTrue("rpc got exception " + error.get(), error.get() == null);
    server.stop();
  }

  @Test
  public void testConnectionPing() throws Exception {
    Server server;
    TestRpcService proxy = null;

    int pingInterval = 50;
    conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
    conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval);
    server = setupTestServer(conf, 5);

    try {
      proxy = getClient(addr, conf);

      proxy.sleep(null, newSleepRequest(pingInterval * 4));
    } finally {
      stop(server, proxy);
    }
  }

  @Test(timeout=30000)
  public void testExternalCall() throws Exception {
    final UserGroupInformation ugi = UserGroupInformation
        .createUserForTesting("user123", new String[0]);
    final IOException expectedIOE = new IOException("boom");

    // use 1 handler so the callq can be plugged
    final Server server = setupTestServer(conf, 1);
    try {
      final AtomicBoolean result = new AtomicBoolean();

      ExternalCall<String> remoteUserCall = newExtCall(ugi,
          new PrivilegedExceptionAction<String>() {
            @Override
            public String run() throws Exception {
              return UserGroupInformation.getCurrentUser().getUserName();
            }
          });

      ExternalCall<String> exceptionCall = newExtCall(ugi,
          new PrivilegedExceptionAction<String>() {
            @Override
            public String run() throws Exception {
              throw expectedIOE;
            }
          });

      final CountDownLatch latch = new CountDownLatch(1);
      final CyclicBarrier barrier = new CyclicBarrier(2);

      ExternalCall<Void> barrierCall = newExtCall(ugi,
          new PrivilegedExceptionAction<Void>() {
            @Override
            public Void run() throws Exception {
              // notify we are in a handler and then wait to keep the callq
              // plugged up
              latch.countDown();
              barrier.await();
              return null;
            }
          });

      server.queueCall(barrierCall);
      server.queueCall(exceptionCall);
      server.queueCall(remoteUserCall);

      // wait for barrier call to enter the handler, check that the other 2
      // calls are actually queued
      latch.await();
      assertEquals(2, server.getCallQueueLen());

      // unplug the callq
      barrier.await();
      barrierCall.get();

      // verify correct ugi is used
      String answer = remoteUserCall.get();
      assertEquals(ugi.getUserName(), answer);

      try {
        exceptionCall.get();
        fail("didn't throw");
      } catch (ExecutionException ee) {
        assertTrue((ee.getCause()) instanceof IOException);
        assertEquals(expectedIOE.getMessage(), ee.getCause().getMessage());
      }
    } finally {
      server.stop();
    }
  }

  private <T> ExternalCall<T> newExtCall(final UserGroupInformation ugi,
      PrivilegedExceptionAction<T> callable) {
    return new ExternalCall<T>(callable) {
      @Override
      public String getProtocol() {
        return "test";
      }
      @Override
      public UserGroupInformation getRemoteUser() {
        return ugi;
      }
    };
  }

  @Test
  public void testRpcMetrics() throws Exception {
    final Server server;
    TestRpcService proxy = null;

    final int interval = 1;
    conf.setBoolean(CommonConfigurationKeys.
        RPC_METRICS_QUANTILE_ENABLE, true);
    conf.set(CommonConfigurationKeys.
        RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);

    server = setupTestServer(conf, 5);
    String testUser = "testUser";
    UserGroupInformation anotherUser =
        UserGroupInformation.createRemoteUser(testUser);
    TestRpcService proxy2 =
        anotherUser.doAs(new PrivilegedAction<TestRpcService>() {
          public TestRpcService run() {
            try {
              return RPC.getProxy(TestRpcService.class, 0,
                  server.getListenerAddress(), conf);
            } catch (IOException e) {
              e.printStackTrace();
            }
            return null;
          }
        });
    try {
      proxy = getClient(addr, conf);

      for (int i = 0; i < 1000; i++) {
        proxy.ping(null, newEmptyRequest());

        proxy.echo(null, newEchoRequest("" + i));
        proxy2.echo(null, newEchoRequest("" + i));
      }
      MetricsRecordBuilder rpcMetrics =
          getMetrics(server.getRpcMetrics().name());
      assertEquals("Expected correct rpc queue count",
          3000, getLongCounter("RpcQueueTimeNumOps", rpcMetrics));
      assertEquals("Expected correct rpc processing count",
          3000, getLongCounter("RpcProcessingTimeNumOps", rpcMetrics));
      assertEquals("Expected correct rpc lock wait count",
          3000, getLongCounter("RpcLockWaitTimeNumOps", rpcMetrics));
      assertEquals("Expected zero rpc lock wait time",
          0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001);
      MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
          rpcMetrics);
      MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
          rpcMetrics);
      String actualUserVsCon = MetricsAsserts
          .getStringMetric("NumOpenConnectionsPerUser", rpcMetrics);
      String proxyUser =
          UserGroupInformation.getCurrentUser().getShortUserName();
      assertTrue(actualUserVsCon.contains("\"" + proxyUser + "\":1"));
      assertTrue(actualUserVsCon.contains("\"" + testUser + "\":1"));

      proxy.lockAndSleep(null, newSleepRequest(5));
      rpcMetrics = getMetrics(server.getRpcMetrics().name());
      assertGauge("RpcLockWaitTimeAvgTime", 10000.0, rpcMetrics);
    } finally {
      if (proxy2 != null) {
        RPC.stopProxy(proxy2);
      }
      stop(server, proxy);
    }
  }

  /**
   *  Test RPC backoff by queue full.
   */
  @Test (timeout=30000)
  public void testClientBackOff() throws Exception {
    Server server;
    final TestRpcService proxy;

    boolean succeeded = false;
    final int numClients = 2;
    final List<Future<Void>> res = new ArrayList<Future<Void>>();
    final ExecutorService executorService =
        Executors.newFixedThreadPool(numClients);
    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
    conf.setBoolean(CommonConfigurationKeys.IPC_NAMESPACE +
        ".0." + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
    RPC.Builder builder = newServerBuilder(conf)
        .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true);
    server = setupTestServer(builder);

    @SuppressWarnings("unchecked")
    CallQueueManager<Call> spy = spy((CallQueueManager<Call>) Whitebox
        .getInternalState(server, "callQueue"));
    Whitebox.setInternalState(server, "callQueue", spy);

    Exception lastException = null;
    proxy = getClient(addr, conf);
    try {
      // start a sleep RPC call to consume the only handler thread.
      // Start another sleep RPC call to make callQueue full.
      // Start another sleep RPC call to make reader thread block on CallQueue.
      for (int i = 0; i < numClients; i++) {
        res.add(executorService.submit(
            new Callable<Void>() {
              @Override
              public Void call() throws ServiceException, InterruptedException {
                proxy.sleep(null, newSleepRequest(100000));
                return null;
              }
            }));
        verify(spy, timeout(500).times(i + 1)).addInternal(
            Mockito.<Call>anyObject(), Mockito.eq(false));
      }
      try {
        proxy.sleep(null, newSleepRequest(100));
      } catch (ServiceException e) {
        RemoteException re = (RemoteException) e.getCause();
        IOException unwrapExeption = re.unwrapRemoteException();
        if (unwrapExeption instanceof RetriableException) {
          succeeded = true;
        } else {
          lastException = unwrapExeption;
        }
      }
    } finally {
      executorService.shutdown();
      stop(server, proxy);
    }
    if (lastException != null) {
      LOG.error("Last received non-RetriableException:", lastException);
    }
    assertTrue("RetriableException not received", succeeded);
  }

  /**
   *  Test RPC backoff by response time of each priority level.
   */
  @Test (timeout=30000)
  public void testClientBackOffByResponseTime() throws Exception {
    final TestRpcService proxy;
    boolean succeeded = false;
    final int numClients = 1;

    GenericTestUtils.setLogLevel(DecayRpcScheduler.LOG, Level.DEBUG);
    GenericTestUtils.setLogLevel(RPC.LOG, Level.DEBUG);

    final List<Future<Void>> res = new ArrayList<Future<Void>>();
    final ExecutorService executorService =
        Executors.newFixedThreadPool(numClients);
    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);

    final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
    Server server = setupDecayRpcSchedulerandTestServer(ns + ".");

    @SuppressWarnings("unchecked")
    CallQueueManager<Call> spy = spy((CallQueueManager<Call>) Whitebox
        .getInternalState(server, "callQueue"));
    Whitebox.setInternalState(server, "callQueue", spy);

    Exception lastException = null;
    proxy = getClient(addr, conf);

    try {
      // start a sleep RPC call that sleeps 3s.
      for (int i = 0; i < numClients; i++) {
        res.add(executorService.submit(
            new Callable<Void>() {
              @Override
              public Void call() throws ServiceException, InterruptedException {
                proxy.sleep(null, newSleepRequest(3000));
                return null;
              }
            }));
        verify(spy, timeout(500).times(i + 1)).addInternal(
            Mockito.<Call>anyObject(), Mockito.eq(false));
      }
      // Start another sleep RPC call and verify the call is backed off due to
      // avg response time(3s) exceeds threshold (2s).
      try {
        // wait for the 1st response time update
        Thread.sleep(5500);
        proxy.sleep(null, newSleepRequest(100));
      } catch (ServiceException e) {
        RemoteException re = (RemoteException) e.getCause();
        IOException unwrapExeption = re.unwrapRemoteException();
        if (unwrapExeption instanceof RetriableException) {
          succeeded = true;
        } else {
          lastException = unwrapExeption;
        }
      }
    } finally {
      executorService.shutdown();
      stop(server, proxy);
    }
    if (lastException != null) {
      LOG.error("Last received non-RetriableException:", lastException);
    }
    assertTrue("RetriableException not received", succeeded);
  }

  /** Test that the metrics for DecayRpcScheduler are updated. */
  @Test (timeout=30000)
  public void testDecayRpcSchedulerMetrics() throws Exception {
    final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
    Server server = setupDecayRpcSchedulerandTestServer(ns + ".");

    MetricsRecordBuilder rb1 =
        getMetrics("DecayRpcSchedulerMetrics2." + ns);
    final long beginDecayedCallVolume = MetricsAsserts.getLongCounter(
        "DecayedCallVolume", rb1);
    final long beginRawCallVolume = MetricsAsserts.getLongCounter(
        "CallVolume", rb1);
    final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers",
        rb1);

    TestRpcService proxy = getClient(addr, conf);
    try {
      for (int i = 0; i < 2; i++) {
        proxy.sleep(null, newSleepRequest(100));
      }

      // Lets Metric system update latest metrics
      GenericTestUtils.waitFor(new Supplier<Boolean>() {
        @Override
        public Boolean get() {
          MetricsRecordBuilder rb2 =
              getMetrics("DecayRpcSchedulerMetrics2." + ns);
          long decayedCallVolume1 = MetricsAsserts.getLongCounter(
              "DecayedCallVolume", rb2);
          long rawCallVolume1 = MetricsAsserts.getLongCounter(
              "CallVolume", rb2);
          int uniqueCaller1 = MetricsAsserts.getIntCounter(
              "UniqueCallers", rb2);
          long callVolumePriority0 = MetricsAsserts.getLongGauge(
              "Priority.0.CompletedCallVolume", rb2);
          long callVolumePriority1 = MetricsAsserts.getLongGauge(
              "Priority.1.CompletedCallVolume", rb2);
          double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge(
              "Priority.0.AvgResponseTime", rb2);
          double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge(
              "Priority.1.AvgResponseTime", rb2);

          LOG.info("DecayedCallVolume: {}", decayedCallVolume1);
          LOG.info("CallVolume: {}", rawCallVolume1);
          LOG.info("UniqueCaller: {}", uniqueCaller1);
          LOG.info("Priority.0.CompletedCallVolume: {}", callVolumePriority0);
          LOG.info("Priority.1.CompletedCallVolume: {}", callVolumePriority1);
          LOG.info("Priority.0.AvgResponseTime: {}", avgRespTimePriority0);
          LOG.info("Priority.1.AvgResponseTime: {}", avgRespTimePriority1);

          return decayedCallVolume1 > beginDecayedCallVolume &&
              rawCallVolume1 > beginRawCallVolume &&
              uniqueCaller1 > beginUniqueCaller;
        }
      }, 30, 60000);
    } finally {
      stop(server, proxy);
    }
  }

  private Server setupDecayRpcSchedulerandTestServer(String ns)
      throws Exception {
    final int queueSizePerHandler = 3;

    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
    conf.setBoolean(ns + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
    conf.setStrings(ns + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY,
        "org.apache.hadoop.ipc.FairCallQueue");
    conf.setStrings(ns + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
        "org.apache.hadoop.ipc.DecayRpcScheduler");
    conf.setInt(ns + CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY,
        2);
    conf.setBoolean(ns +
            DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY,
        true);
    // set a small thresholds 2s and 4s for level 0 and level 1 for testing
    conf.set(ns +
            DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY
        , "2s, 4s");

    // Set max queue size to 3 so that 2 calls from the test won't trigger
    // back off because the queue is full.
    RPC.Builder builder = newServerBuilder(conf)
        .setQueueSizePerHandler(queueSizePerHandler).setNumHandlers(1)
        .setVerbose(true);
    return setupTestServer(builder);
  }

  /**
   *  Test RPC timeout.
   */
  @Test(timeout=30000)
  public void testClientRpcTimeout() throws Exception {
    Server server;
    TestRpcService proxy = null;

    RPC.Builder builder = newServerBuilder(conf)
        .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true);
    server = setupTestServer(builder);

    try {
      // Test RPC timeout with default ipc.client.ping.
      try {
        Configuration c = new Configuration(conf);
        c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
        proxy = getClient(addr, c);
        proxy.sleep(null, newSleepRequest(3000));
        fail("RPC should time out.");
      } catch (ServiceException e) {
        assertTrue(e.getCause() instanceof SocketTimeoutException);
        LOG.info("got expected timeout.", e);
      }

      // Test RPC timeout when ipc.client.ping is false.
      try {
        Configuration c = new Configuration(conf);
        c.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
        c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
        proxy = getClient(addr, c);
        proxy.sleep(null, newSleepRequest(3000));
        fail("RPC should time out.");
      } catch (ServiceException e) {
        assertTrue(e.getCause() instanceof SocketTimeoutException);
        LOG.info("got expected timeout.", e);
      }

      // Test negative timeout value.
      try {
        Configuration c = new Configuration(conf);
        c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, -1);
        proxy = getClient(addr, c);
        proxy.sleep(null, newSleepRequest(2000));
      } catch (ServiceException e) {
        LOG.info("got unexpected exception.", e);
        fail("RPC should not time out.");
      }

      // Test RPC timeout greater than ipc.ping.interval.
      try {
        Configuration c = new Configuration(conf);
        c.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
        c.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 800);
        c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
        proxy = getClient(addr, c);

        try {
          // should not time out because effective rpc-timeout is
          // multiple of ping interval: 1600 (= 800 * (1000 / 800 + 1))
          proxy.sleep(null, newSleepRequest(1300));
        } catch (ServiceException e) {
          LOG.info("got unexpected exception.", e);
          fail("RPC should not time out.");
        }

        proxy.sleep(null, newSleepRequest(2000));
        fail("RPC should time out.");
      } catch (ServiceException e) {
        assertTrue(e.getCause() instanceof SocketTimeoutException);
        LOG.info("got expected timeout.", e);
      }

    } finally {
      stop(server, proxy);
    }
  }

  public static class FakeRequestClass extends RpcWritable {
    static volatile IOException exception;
    @Override
    void writeTo(ResponseBuffer out) throws IOException {
      throw new UnsupportedOperationException();
    }
    @Override
    <T> T readFrom(ByteBuffer bb) throws IOException {
      throw exception;
    }
  }

  @SuppressWarnings("serial")
  public static class TestReaderException extends IOException {
    public TestReaderException(String msg) {
      super(msg);
    }
    @Override
    public boolean equals(Object t) {
      return (t.getClass() == TestReaderException.class) &&
             getMessage().equals(((TestReaderException)t).getMessage());
    }
  }

  @Test (timeout=30000)
  public void testReaderExceptions() throws Exception {
    Server server = null;
    TestRpcService proxy = null;

    // will attempt to return this exception from a reader with and w/o
    // the connection closing.
    IOException expectedIOE = new TestReaderException("testing123");

    @SuppressWarnings("serial")
    IOException rseError = new RpcServerException("keepalive", expectedIOE){
      @Override
      public RpcStatusProto getRpcStatusProto() {
        return RpcStatusProto.ERROR;
      }
    };
    @SuppressWarnings("serial")
    IOException rseFatal = new RpcServerException("disconnect", expectedIOE) {
      @Override
      public RpcStatusProto getRpcStatusProto() {
        return RpcStatusProto.FATAL;
      }
    };

    try {
      RPC.Builder builder = newServerBuilder(conf)
          .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true);
      server = setupTestServer(builder);
      Whitebox.setInternalState(
          server, "rpcRequestClass", FakeRequestClass.class);
      MutableCounterLong authMetric =
          (MutableCounterLong)Whitebox.getInternalState(
              server.getRpcMetrics(), "rpcAuthorizationSuccesses");

      proxy = getClient(addr, conf);
      boolean isDisconnected = true;
      Connection lastConn = null;
      long expectedAuths = 0;

      // fuzz the client.
      for (int i=0; i < 128; i++) {
        String reqName = "request[" + i + "]";
        int r = ThreadLocalRandom.current().nextInt();
        final boolean doDisconnect = r % 4 == 0;
        LOG.info("TestDisconnect request[" + i + "] " +
                 " shouldConnect=" + isDisconnected +
                 " willDisconnect=" + doDisconnect);
        if (isDisconnected) {
          expectedAuths++;
        }
        try {
          FakeRequestClass.exception = doDisconnect ? rseFatal : rseError;
          proxy.ping(null, newEmptyRequest());
          fail(reqName + " didn't fail");
        } catch (ServiceException e) {
          RemoteException re = (RemoteException)e.getCause();
          assertEquals(reqName, expectedIOE, re.unwrapRemoteException());
        }
        // check authorizations to ensure new connection when expected,
        // then conclusively determine if connections are disconnected
        // correctly.
        assertEquals(reqName, expectedAuths, authMetric.value());
        if (!doDisconnect) {
          // if it wasn't fatal, verify there's only one open connection.
          Connection[] conns = server.getConnections();
          assertEquals(reqName, 1, conns.length);
          // verify whether the connection should have been reused.
          if (isDisconnected) {
            assertNotSame(reqName, lastConn, conns[0]);
          } else {
            assertSame(reqName, lastConn, conns[0]);
          }
          lastConn = conns[0];
        } else if (lastConn != null) {
          // avoid race condition in server where connection may not be
          // fully removed yet.  just make sure it's marked for being closed.
          // the open connection checks above ensure correct behavior.
          assertTrue(reqName, lastConn.shouldClose());
        }
        isDisconnected = doDisconnect;
      }
    } finally {
      stop(server, proxy);
    }
  }

  public static void main(String[] args) throws Exception {
    new TestRPC().testCallsInternal(conf);
  }
}
