HADOOP-18406: Adds alignment context to call path for creating RPC proxy with multiple connections per user.

Fixes #4748

Signed-off-by: Owen O'Malley <oomalley@linkedin.com>
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index d904aa8..df0f734 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -80,9 +80,9 @@
   @Override
   @SuppressWarnings("unchecked")
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      ConnectionId connId, Configuration conf, SocketFactory factory)
-      throws IOException {
-    final Invoker invoker = new Invoker(protocol, connId, conf, factory);
+      ConnectionId connId, Configuration conf, SocketFactory factory,
+      AlignmentContext alignmentContext) throws IOException {
+    final Invoker invoker = new Invoker(protocol, connId, conf, factory, alignmentContext);
     return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
         protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
   }
@@ -126,7 +126,7 @@
     return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
         (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
             new Class[] { protocol }, new Invoker(protocol, connId, conf,
-                factory)), false);
+                factory, null)), false);
   }
 
   protected static class Invoker implements RpcInvocationHandler {
@@ -147,9 +147,8 @@
         throws IOException {
       this(protocol, Client.ConnectionId.getConnectionId(
           addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
-          conf, factory);
+          conf, factory, alignmentContext);
       this.fallbackToSimpleAuth = fallbackToSimpleAuth;
-      this.alignmentContext = alignmentContext;
     }
     
     /**
@@ -158,14 +157,16 @@
      * @param connId input connId.
      * @param conf input Configuration.
      * @param factory input factory.
+     * @param alignmentContext Alignment context
      */
     protected Invoker(Class<?> protocol, Client.ConnectionId connId,
-        Configuration conf, SocketFactory factory) {
+        Configuration conf, SocketFactory factory, AlignmentContext alignmentContext) {
       this.remoteId = connId;
       this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
       this.protocolName = RPC.getProtocolName(protocol);
       this.clientProtocolVersion = RPC
           .getProtocolVersion(protocol);
+      this.alignmentContext = alignmentContext;
     }
 
     private RequestHeaderProto constructRpcRequestHeader(Method method) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java
index 336bf06..bedecc8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java
@@ -103,9 +103,9 @@
   @Override
   @SuppressWarnings("unchecked")
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      ConnectionId connId, Configuration conf, SocketFactory factory)
-      throws IOException {
-    final Invoker invoker = new Invoker(protocol, connId, conf, factory);
+      ConnectionId connId, Configuration conf, SocketFactory factory,
+      AlignmentContext alignmentContext) throws IOException {
+    final Invoker invoker = new Invoker(protocol, connId, conf, factory, alignmentContext);
     return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
         protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
   }
@@ -133,7 +133,7 @@
     return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
         (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
             new Class[]{protocol}, new Invoker(protocol, connId, conf,
-                factory)), false);
+                factory, null)), false);
   }
 
   protected static class Invoker implements RpcInvocationHandler {
@@ -154,9 +154,8 @@
         throws IOException {
       this(protocol, Client.ConnectionId.getConnectionId(
           addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
-          conf, factory);
+          conf, factory, alignmentContext);
       this.fallbackToSimpleAuth = fallbackToSimpleAuth;
-      this.alignmentContext = alignmentContext;
     }
 
     /**
@@ -166,14 +165,16 @@
      * @param connId input connId.
      * @param conf input Configuration.
      * @param factory input factory.
+     * @param alignmentContext Alignment context
      */
     protected Invoker(Class<?> protocol, Client.ConnectionId connId,
-        Configuration conf, SocketFactory factory) {
+        Configuration conf, SocketFactory factory, AlignmentContext alignmentContext) {
       this.remoteId = connId;
       this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
       this.protocolName = RPC.getProtocolName(protocol);
       this.clientProtocolVersion = RPC
           .getProtocolVersion(protocol);
+      this.alignmentContext = alignmentContext;
     }
 
     private RequestHeaderProto constructRpcRequestHeader(Method method) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index 7f35b13..fc562b5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -558,11 +558,32 @@
   public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
       long clientVersion, ConnectionId connId, Configuration conf,
       SocketFactory factory) throws IOException {
+    return getProtocolProxy(protocol, clientVersion, connId, conf,
+        factory, null);
+  }
+
+  /**
+   * Get a protocol proxy that contains a proxy connection to a remote server
+   * and a set of methods that are supported by the server.
+   *
+   * @param <T> Generics Type T
+   * @param protocol protocol class
+   * @param clientVersion client's version
+   * @param connId client connection identifier
+   * @param conf configuration
+   * @param factory socket factory
+   * @param alignmentContext StateID alignment context
+   * @return the protocol proxy
+   * @throws IOException if the far end through a RemoteException
+   */
+  public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
+      long clientVersion, ConnectionId connId, Configuration conf,
+      SocketFactory factory, AlignmentContext alignmentContext) throws IOException {
     if (UserGroupInformation.isSecurityEnabled()) {
       SaslRpcServer.init(conf);
     }
     return getProtocolEngine(protocol, conf).getProxy(
-        protocol, clientVersion, connId, conf, factory);
+        protocol, clientVersion, connId, conf, factory, alignmentContext);
   }
   
   /**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
index 1f0ff2d..f322f6e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
@@ -66,11 +66,13 @@
    * @param connId input ConnectionId.
    * @param conf input Configuration.
    * @param factory input factory.
+   * @param alignmentContext Alignment context
    * @throws IOException raised on errors performing I/O.
    * @return ProtocolProxy.
    */
   <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      Client.ConnectionId connId, Configuration conf, SocketFactory factory)
+      Client.ConnectionId connId, Configuration conf, SocketFactory factory,
+      AlignmentContext alignmentContext)
       throws IOException;
 
   /**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
index 3e4ee70..d92bcea 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
@@ -315,16 +315,18 @@
    * @param connId input ConnectionId.
    * @param conf input Configuration.
    * @param factory input factory.
+   * @param alignmentContext Alignment context
    * @throws IOException raised on errors performing I/O.
    * @return ProtocolProxy.
    */
   @Override
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      Client.ConnectionId connId, Configuration conf, SocketFactory factory)
+      Client.ConnectionId connId, Configuration conf, SocketFactory factory,
+      AlignmentContext alignmentContext)
       throws IOException {
     return getProxy(protocol, clientVersion, connId.getAddress(),
         connId.getTicket(), conf, factory, connId.getRpcTimeout(),
-        connId.getRetryPolicy(), null, null);
+        connId.getRetryPolicy(), null, alignmentContext);
   }
 
   /**
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index ed11c9c..7201b28 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -293,7 +293,8 @@
 
     @Override
     public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-        ConnectionId connId, Configuration conf, SocketFactory factory)
+        ConnectionId connId, Configuration conf, SocketFactory factory,
+        AlignmentContext alignmentContext)
         throws IOException {
       throw new UnsupportedOperationException("This proxy is not supported");
     }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
index 7635b16..5b5c8bb 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
@@ -189,7 +189,8 @@
           0,
           connId,
           clientConf,
-          NetUtils.getDefaultSocketFactory(clientConf)).getProxy();
+          NetUtils.getDefaultSocketFactory(clientConf),
+          null).getProxy();
     } catch (IOException e) {
       throw new ServiceException(e);
     }