HDFS-16669: Enhance client protocol to propagate last seen state IDs for multiple nameservices.

Fixes #4584

Signed-off-by: Owen O'Malley <oomalley@linkedin.com>
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
index 3d30923..8d43fd7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
@@ -46,7 +46,7 @@
   void updateResponseState(RpcResponseHeaderProto.Builder header);
 
   /**
-   * This is the intended client method call to implement to recieve state info
+   * This is the intended client method call to implement to receive state info
    * during RPC response processing.
    *
    * @param header The RPC response header.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 9e622a8..ec624cc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -925,7 +925,7 @@
     private volatile String detailedMetricsName = "";
     final int callId;            // the client's call id
     final int retryCount;        // the retry count of the call
-    long timestampNanos;         // time the call was received
+    private final long timestampNanos; // time the call was received
     long responseTimestampNanos; // time the call was served
     private AtomicInteger responseWaitCount = new AtomicInteger(1);
     final RPC.RpcKind rpcKind;
@@ -1107,6 +1107,10 @@
 
     public void setDeferredError(Throwable t) {
     }
+
+    public long getTimestampNanos() {
+      return timestampNanos;
+    }
   }
 
   /** A RPC extended call queued for handling. */
@@ -1188,7 +1192,7 @@
 
       try {
         value = call(
-            rpcKind, connection.protocolName, rpcRequest, timestampNanos);
+            rpcKind, connection.protocolName, rpcRequest, getTimestampNanos());
       } catch (Throwable e) {
         populateResponseParamsOnError(e, responseParams);
       }
diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
index 042928c..d9becf7 100644
--- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
+++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
@@ -91,6 +91,10 @@
   optional RPCTraceInfoProto traceInfo = 6; // tracing info
   optional RPCCallerContextProto callerContext = 7; // call context
   optional int64 stateId = 8; // The last seen Global State ID
+  // Alignment context info for use with routers.
+  // The client should not interpret these bytes, but only forward bytes
+  // received from RpcResponseHeaderProto.routerFederatedState.
+  optional bytes routerFederatedState = 9;
 }
 
 
@@ -157,6 +161,10 @@
   optional bytes clientId = 7; // Globally unique client ID
   optional sint32 retryCount = 8 [default = -1];
   optional int64 stateId = 9; // The last written Global State ID
+  // Alignment context info for use with routers.
+  // The client should not interpret these bytes, but only
+  // forward them to the router using RpcRequestHeaderProto.routerFederatedState.
+  optional bytes routerFederatedState = 10;
 }
 
 message RpcSaslProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
index 336130e..7f61d80 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
@@ -312,3 +312,16 @@
 message GetDisabledNameservicesResponseProto {
   repeated string nameServiceIds = 1;
 }
+
+/////////////////////////////////////////////////
+// Alignment state for namespaces.
+/////////////////////////////////////////////////
+
+/**
+ * Clients should receive this message in RPC responses and forward it
+ * in RPC requests without interpreting it. It should be encoded
+ * as an obscure byte array when being sent to clients.
+ */
+message RouterFederatedStateProto {
+  map<string, int64> namespaceStateIds = 1; // Last seen state IDs for multiple namespaces.
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java
new file mode 100644
index 0000000..60ab4b2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.ClientId;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcConstants;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.util.ProtoUtil;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestRouterFederatedState {
+
+  @Test
+  public void testRpcRouterFederatedState() throws InvalidProtocolBufferException {
+    byte[] uuid = ClientId.getClientId();
+    Map<String, Long> expectedStateIds = new HashMap<String, Long>() {{
+      put("namespace1", 11L );
+      put("namespace2", 22L);
+    }};
+
+    AlignmentContext alignmentContext = new AlignmentContextWithRouterState(expectedStateIds);
+
+    RpcHeaderProtos.RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
+        RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET, 0,
+        RpcConstants.INVALID_RETRY_COUNT, uuid, alignmentContext);
+
+    Map<String, Long> stateIdsFromHeader =
+        RouterFederatedStateProto.parseFrom(
+            header.getRouterFederatedState().toByteArray()
+        ).getNamespaceStateIdsMap();
+
+    assertEquals(expectedStateIds, stateIdsFromHeader);
+  }
+
+  private static class AlignmentContextWithRouterState implements AlignmentContext {
+
+    Map<String, Long> routerFederatedState;
+
+    public AlignmentContextWithRouterState(Map<String, Long> namespaceStates) {
+      this.routerFederatedState = namespaceStates;
+    }
+
+    @Override
+    public void updateRequestState(RpcHeaderProtos.RpcRequestHeaderProto.Builder header) {
+      RouterFederatedStateProto fedState = RouterFederatedStateProto
+          .newBuilder()
+          .putAllNamespaceStateIds(routerFederatedState)
+          .build();
+
+      header.setRouterFederatedState(fedState.toByteString());
+    }
+
+    @Override
+    public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder header) {}
+
+    @Override
+    public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {}
+
+    @Override
+    public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold) throws IOException {
+      return 0;
+    }
+
+    @Override
+    public long getLastSeenStateId() {
+      return 0;
+    }
+
+    @Override
+    public boolean isCoordinatedCall(String protocolName, String method) {
+      return false;
+    }
+  }
+}
\ No newline at end of file