HDFS-13547. Add ingress port based sasl resolver. Contributed by Chen Liang.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/IngressPortBasedResolver.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/IngressPortBasedResolver.java
new file mode 100644
index 0000000..a30e4a8
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/IngressPortBasedResolver.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of SaslPropertiesResolver. Used on server side,
+ * returns SASL properties based on the port the client is connecting
+ * to. This should be used along with server side enabling multiple ports
+ * TODO: when NN multiple listener is enabled, automatically use this
+ * resolver without having to set in config.
+ *
+ * For configuration, for example if server runs on two ports 9000 and 9001,
+ * and we want to specify 9000 to use auth-conf and 9001 to use auth.
+ *
+ * We need to set the following configuration properties:
+ * ingress.port.sasl.configured.ports=9000,9001
+ * ingress.port.sasl.prop.9000=privacy
+ * ingress.port.sasl.prop.9001=authentication
+ *
+ * One note is that, if there is misconfiguration that a port, say, 9002 is
+ * given in ingress.port.sasl.configured.ports, but it's sasl prop is not
+ * set, a default of QOP of privacy (auth-conf) will be used. In addition,
+ * if a port is not given even in ingress.port.sasl.configured.ports, but
+ * is being checked in getServerProperties(), the default SASL prop will
+ * be returned. Both of these two cases are considered misconfiguration.
+ */
+public class IngressPortBasedResolver extends SaslPropertiesResolver {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(IngressPortBasedResolver.class.getName());
+
+  static final String INGRESS_PORT_SASL_PROP_PREFIX = "ingress.port.sasl.prop";
+
+  static final String INGRESS_PORT_SASL_CONFIGURED_PORTS =
+      "ingress.port.sasl.configured.ports";
+
+  // no need to concurrent map, because after setConf() it never change,
+  // only for read.
+  private HashMap<Integer, Map<String, String>> portPropMapping;
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    portPropMapping = new HashMap<>();
+    Collection<String> portStrings =
+        conf.getTrimmedStringCollection(INGRESS_PORT_SASL_CONFIGURED_PORTS);
+    for (String portString : portStrings) {
+      int port = Integer.parseInt(portString);
+      String configKey = INGRESS_PORT_SASL_PROP_PREFIX + "." + portString;
+      Map<String, String> props = getSaslProperties(conf, configKey,
+          SaslRpcServer.QualityOfProtection.PRIVACY);
+      portPropMapping.put(port, props);
+    }
+    LOG.debug("Configured with port to QOP mapping as:" + portPropMapping);
+  }
+
+  /**
+   * Identify the Sasl Properties to be used for a connection with a client.
+   * @param clientAddress client's address
+   * @param ingressPort the port that the client is connecting
+   * @return the sasl properties to be used for the connection.
+   */
+  @Override
+  @VisibleForTesting
+  public Map<String, String> getServerProperties(InetAddress clientAddress,
+      int ingressPort) {
+    LOG.debug("Resolving SASL properties for " + clientAddress + " "
+        + ingressPort);
+    if (!portPropMapping.containsKey(ingressPort)) {
+      LOG.warn("An un-configured port is being requested " + ingressPort
+          + " using default");
+      return getDefaultProperties();
+    }
+    return portPropMapping.get(ingressPort);
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
index 305443ce..64b86e3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.security;
 
 import java.net.InetAddress;
-import java.util.Locale;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -96,6 +95,17 @@
   }
 
   /**
+   * Identify the Sasl Properties to be used for a connection with a  client.
+   * @param clientAddress  client's address
+   * @param ingressPort the port that the client is connecting
+   * @return the sasl properties to be used for the connection.
+   */
+  public Map<String, String> getServerProperties(InetAddress clientAddress,
+      int ingressPort){
+    return properties;
+  }
+
+  /**
    * Identify the Sasl Properties to be used for a connection with a server.
    * @param serverAddress server's address
    * @return the sasl properties to be used for the connection.
@@ -103,4 +113,39 @@
   public Map<String, String> getClientProperties(InetAddress serverAddress){
     return properties;
   }
+
+  /**
+   * Identify the Sasl Properties to be used for a connection with a server.
+   * @param serverAddress server's address
+   * @param ingressPort the port that is used to connect to server
+   * @return the sasl properties to be used for the connection.
+   */
+  public Map<String, String> getClientProperties(InetAddress serverAddress,
+      int ingressPort) {
+    return properties;
+  }
+
+  /**
+   * A util function to retrieve specific additional sasl property from config.
+   * Used by subclasses to read sasl properties used by themselves.
+   * @param conf the configuration
+   * @param configKey the config key to look for
+   * @param defaultQOP the default QOP if the key is missing
+   * @return sasl property associated with the given key
+   */
+  static Map<String, String> getSaslProperties(Configuration conf,
+      String configKey, QualityOfProtection defaultQOP) {
+    Map<String, String> saslProps = new TreeMap<>();
+    String[] qop = conf.getStrings(configKey, defaultQOP.toString());
+
+    for (int i=0; i < qop.length; i++) {
+      qop[i] = QualityOfProtection.valueOf(
+          StringUtils.toUpperCase(qop[i])).getSaslQop();
+    }
+
+    saslProps.put(Sasl.QOP, StringUtils.join(",", qop));
+    saslProps.put(Sasl.SERVER_AUTH, "true");
+
+    return saslProps;
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/WhitelistBasedResolver.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/WhitelistBasedResolver.java
index a64c4de..5964886 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/WhitelistBasedResolver.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/WhitelistBasedResolver.java
@@ -20,15 +20,10 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Map;
-import java.util.TreeMap;
-
-import javax.security.sasl.Sasl;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SaslPropertiesResolver;
 import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
 import org.apache.hadoop.util.CombinedIPWhiteList;
-import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -134,18 +129,7 @@
   }
 
   static Map<String, String> getSaslProperties(Configuration conf) {
-    Map<String, String> saslProps =new TreeMap<String, String>();
-    String[] qop = conf.getStrings(HADOOP_RPC_PROTECTION_NON_WHITELIST,
-        QualityOfProtection.PRIVACY.toString());
-
-    for (int i=0; i < qop.length; i++) {
-      qop[i] = QualityOfProtection.valueOf(
-          StringUtils.toUpperCase(qop[i])).getSaslQop();
-    }
-
-    saslProps.put(Sasl.QOP, StringUtils.join(",", qop));
-    saslProps.put(Sasl.SERVER_AUTH, "true");
-
-    return saslProps;
+    return getSaslProperties(conf, HADOOP_RPC_PROTECTION_NON_WHITELIST,
+        QualityOfProtection.PRIVACY);
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestIngressPortBasedResolver.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestIngressPortBasedResolver.java
new file mode 100644
index 0000000..96c80af
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestIngressPortBasedResolver.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import javax.security.sasl.Sasl;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Test class for IngressPortBasedResolver.
+ */
+public class TestIngressPortBasedResolver {
+
+  /**
+   * A simple test to test that for the configured ports, the resolver
+   * can return the current SASL properties.
+   */
+  @Test
+  public void testResolver() {
+    Configuration conf = new Configuration();
+    conf.set("ingress.port.sasl.configured.ports", "444,555,666,777");
+    conf.set("ingress.port.sasl.prop.444", "authentication");
+    conf.set("ingress.port.sasl.prop.555", "authentication,privacy");
+    conf.set("ingress.port.sasl.prop.666", "privacy");
+
+    IngressPortBasedResolver resolver = new IngressPortBasedResolver();
+    resolver.setConf(conf);
+
+    // the client address does not matter, give it a null
+    assertEquals("auth",
+        resolver.getServerProperties(null, 444).get(Sasl.QOP));
+    assertEquals("auth,auth-conf",
+        resolver.getServerProperties(null, 555).get(Sasl.QOP));
+    assertEquals("auth-conf",
+        resolver.getServerProperties(null, 666).get(Sasl.QOP));
+    assertEquals("auth-conf",
+        resolver.getServerProperties(null, 777).get(Sasl.QOP));
+    assertEquals("auth",
+        resolver.getServerProperties(null, 888).get(Sasl.QOP));
+  }
+}