HDFS-16669: Enhance client protocol to propagate last seen state IDs for multiple nameservices.
Fixes #4584
Signed-off-by: Owen O'Malley <oomalley@linkedin.com>
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
index 3d30923..8d43fd7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
@@ -46,7 +46,7 @@
void updateResponseState(RpcResponseHeaderProto.Builder header);
/**
- * This is the intended client method call to implement to recieve state info
+ * This is the intended client method call to implement to receive state info
* during RPC response processing.
*
* @param header The RPC response header.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 9e622a8..ec624cc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -925,7 +925,7 @@
private volatile String detailedMetricsName = "";
final int callId; // the client's call id
final int retryCount; // the retry count of the call
- long timestampNanos; // time the call was received
+ private final long timestampNanos; // time the call was received
long responseTimestampNanos; // time the call was served
private AtomicInteger responseWaitCount = new AtomicInteger(1);
final RPC.RpcKind rpcKind;
@@ -1107,6 +1107,10 @@
public void setDeferredError(Throwable t) {
}
+
+ public long getTimestampNanos() {
+ return timestampNanos;
+ }
}
/** A RPC extended call queued for handling. */
@@ -1188,7 +1192,7 @@
try {
value = call(
- rpcKind, connection.protocolName, rpcRequest, timestampNanos);
+ rpcKind, connection.protocolName, rpcRequest, getTimestampNanos());
} catch (Throwable e) {
populateResponseParamsOnError(e, responseParams);
}
diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
index 042928c..d9becf7 100644
--- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
+++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
@@ -91,6 +91,10 @@
optional RPCTraceInfoProto traceInfo = 6; // tracing info
optional RPCCallerContextProto callerContext = 7; // call context
optional int64 stateId = 8; // The last seen Global State ID
+ // Alignment context info for use with routers.
+ // The client should not interpret these bytes, but only forward bytes
+ // received from RpcResponseHeaderProto.routerFederatedState.
+ optional bytes routerFederatedState = 9;
}
@@ -157,6 +161,10 @@
optional bytes clientId = 7; // Globally unique client ID
optional sint32 retryCount = 8 [default = -1];
optional int64 stateId = 9; // The last written Global State ID
+ // Alignment context info for use with routers.
+ // The client should not interpret these bytes, but only
+ // forward them to the router using RpcRequestHeaderProto.routerFederatedState.
+ optional bytes routerFederatedState = 10;
}
message RpcSaslProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
index 336130e..7f61d80 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
@@ -312,3 +312,16 @@
message GetDisabledNameservicesResponseProto {
repeated string nameServiceIds = 1;
}
+
+/////////////////////////////////////////////////
+// Alignment state for namespaces.
+/////////////////////////////////////////////////
+
+/**
+ * Clients should receive this message in RPC responses and forward it
+ * in RPC requests without interpreting it. It should be encoded
+ * as an obscure byte array when being sent to clients.
+ */
+message RouterFederatedStateProto {
+ map<string, int64> namespaceStateIds = 1; // Last seen state IDs for multiple namespaces.
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java
new file mode 100644
index 0000000..60ab4b2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.ClientId;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcConstants;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.util.ProtoUtil;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestRouterFederatedState {
+
+ @Test
+ public void testRpcRouterFederatedState() throws InvalidProtocolBufferException {
+ byte[] uuid = ClientId.getClientId();
+ Map<String, Long> expectedStateIds = new HashMap<String, Long>() {{
+ put("namespace1", 11L );
+ put("namespace2", 22L);
+ }};
+
+ AlignmentContext alignmentContext = new AlignmentContextWithRouterState(expectedStateIds);
+
+ RpcHeaderProtos.RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
+ RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET, 0,
+ RpcConstants.INVALID_RETRY_COUNT, uuid, alignmentContext);
+
+ Map<String, Long> stateIdsFromHeader =
+ RouterFederatedStateProto.parseFrom(
+ header.getRouterFederatedState().toByteArray()
+ ).getNamespaceStateIdsMap();
+
+ assertEquals(expectedStateIds, stateIdsFromHeader);
+ }
+
+ private static class AlignmentContextWithRouterState implements AlignmentContext {
+
+ Map<String, Long> routerFederatedState;
+
+ public AlignmentContextWithRouterState(Map<String, Long> namespaceStates) {
+ this.routerFederatedState = namespaceStates;
+ }
+
+ @Override
+ public void updateRequestState(RpcHeaderProtos.RpcRequestHeaderProto.Builder header) {
+ RouterFederatedStateProto fedState = RouterFederatedStateProto
+ .newBuilder()
+ .putAllNamespaceStateIds(routerFederatedState)
+ .build();
+
+ header.setRouterFederatedState(fedState.toByteString());
+ }
+
+ @Override
+ public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder header) {}
+
+ @Override
+ public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {}
+
+ @Override
+ public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long getLastSeenStateId() {
+ return 0;
+ }
+
+ @Override
+ public boolean isCoordinatedCall(String protocolName, String method) {
+ return false;
+ }
+ }
+}
\ No newline at end of file