Revert "AMBARI-17694. Kafka listeners property does not show SASL_PLAINTEXT protocol when Kerberos is enabled (Anita Jebaraj via rlevas)"
This reverts commit 124f48ef899fddb6bdb96ebea9aa3a6a1a6adbca.
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
index d472b79..66be3bf 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
@@ -20,6 +20,7 @@
import com.google.inject.Singleton;
import org.apache.ambari.server.AmbariException;
+
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
@@ -48,7 +49,6 @@
{
put("each", new EachFunction());
put("toLower", new ToLowerFunction());
- put("replace", new ReplaceValue());
}
};
@@ -226,37 +226,7 @@
return "";
}
}
- /**
- * ReplaceValue is a Function implementation that replaces the value in the string
- * <p/>
- * This function expects the following arguments (in order) within the args array:
- * <ol>
- * <li>regular expression that should be replaced</li>
- * <li>replacement value for the string</li>
- * </ol>
- */
- private static class ReplaceValue implements Function {
-
- @Override
- public String perform(String[] args, String data) {
- if ((args == null) || (args.length != 2)) {
- throw new IllegalArgumentException("Invalid number of arguments encountered");
- }
- if (data != null) {
- StringBuffer builder = new StringBuffer();
- String regex = args[0];
- String replacement = args[1];
- Pattern pattern = Pattern.compile(regex);
- Matcher matcher = pattern.matcher(data);
- while(matcher.find()) {
- matcher.appendReplacement(builder, replacement);
- }
- matcher.appendTail(builder);
- return builder.toString();
- }
- return "";
- }
- }
+
/**
* ToLowerFunction is a Function implementation that converts a String to lowercase
*/
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
index d74855b..ac7b0ae 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
@@ -80,16 +80,21 @@
listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
Logger.info(format("Kafka listeners: {listeners}"))
- kafka_server_config['listeners'] = listeners
if params.security_enabled and params.kafka_kerberos_enabled:
Logger.info("Kafka kerberos security is enabled.")
+ if "SASL" not in listeners:
+ listeners = listeners.replace("PLAINTEXT", "PLAINTEXTSASL")
+
+ kafka_server_config['listeners'] = listeners
kafka_server_config['advertised.listeners'] = listeners
Logger.info(format("Kafka advertised listeners: {listeners}"))
- elif 'advertised.listeners' in kafka_server_config:
- advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname)
- kafka_server_config['advertised.listeners'] = advertised_listeners
- Logger.info(format("Kafka advertised listeners: {advertised_listeners}"))
+ else:
+ kafka_server_config['listeners'] = listeners
+ if 'advertised.listeners' in kafka_server_config:
+ advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname)
+ kafka_server_config['advertised.listeners'] = advertised_listeners
+ Logger.info(format("Kafka advertised listeners: {advertised_listeners}"))
else:
kafka_server_config['host.name'] = params.hostname
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
index ab1ed1f..2b1c01b 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
@@ -14,8 +14,7 @@
"principal.to.local.class":"kafka.security.auth.KerberosPrincipalToLocal",
"super.users": "user:${kafka-env/kafka_user}",
"security.inter.broker.protocol": "PLAINTEXTSASL",
- "zookeeper.set.acl": "true",
- "listeners": "${kafka-broker/listeners|replace(\\bPLAINTEXT\\b, PLAINTEXTSASL)}"
+ "zookeeper.set.acl": "true"
}
}
],
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
index 8be0eb9..ee2a671 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
@@ -158,10 +158,6 @@
put("realm", "UNIT.TEST");
}});
- put("kafka-broker", new HashMap<String, String>() {{
- put("listeners", "PLAINTEXT://localhost:6667");
- }});
-
put("clusterHostInfo", new HashMap<String, String>() {{
put("hive_metastore_host", "host1.unit.test, host2.unit.test , host3.unit.test"); // spaces are there on purpose.
}});
@@ -175,8 +171,6 @@
helper.replaceVariables("hive.metastore.local=false,hive.metastore.uris=${clusterHostInfo/hive_metastore_host | each(thrift://%s:9083, \\\\,, \\s*\\,\\s*)},hive.metastore.sasl.enabled=true,hive.metastore.execute.setugi=true,hive.metastore.warehouse.dir=/apps/hive/warehouse,hive.exec.mode.local.auto=false,hive.metastore.kerberos.principal=hive/_HOST@${realm}", configurations));
Assert.assertEquals("test=unit.test", helper.replaceVariables("test=${realm|toLower()}", configurations));
-
- Assert.assertEquals("PLAINTEXTSASL://localhost:6667", helper.replaceVariables("${kafka-broker/listeners|replace(\\bPLAINTEXT\\b,PLAINTEXTSASL)}", configurations));
}
-}
+}
\ No newline at end of file