ATLAS-4788 : Kafka password is in clear text in application.properties

Signed-off-by: Pinal Shah <pinal.shah@freestoneinfotech.com>
diff --git a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
index 1674422..672caa8 100644
--- a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
+++ b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.utils;
 
 import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.security.SecurityUtil;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -43,6 +44,7 @@
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
+import static org.apache.atlas.security.SecurityProperties.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
 
 public class KafkaUtils implements AutoCloseable {
 
@@ -62,6 +64,9 @@
     public static final String ATLAS_KAFKA_PROPERTY_PREFIX     = "atlas.kafka";
     public static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
 
+    public static final String JAAS_PASSWORD_SUFFIX            = "password";
+    private static final String JAAS_MASK_PASSWORD             = "********";
+
     final protected Properties  kafkaConfiguration;
     final protected AdminClient adminClient;
     final protected boolean     importInternalTopics;
@@ -254,6 +259,7 @@
 
             String       optionPrefix       = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
             String       principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
+            String       passwordOptionKey  = optionPrefix + JAAS_PASSWORD_SUFFIX;
             int          optionPrefixLen    = optionPrefix.length();
             StringBuffer optionStringBuffer = new StringBuffer();
 
@@ -271,7 +277,16 @@
                         } catch (IOException e) {
                             LOG.warn("Failed to build serverPrincipal. Using provided value:[{}]", optionVal);
                         }
-
+                        if (key.equalsIgnoreCase(passwordOptionKey)) {
+                            String jaasKafkaClientConfigurationProperty = "atlas.jaas.KafkaClient.option.password";
+                            if (JAAS_MASK_PASSWORD.equals(configuration.getString(jaasKafkaClientConfigurationProperty))) {
+                                try {
+                                    optionVal = SecurityUtil.getPassword(configuration, jaasKafkaClientConfigurationProperty, HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH);
+                                } catch (Exception e) {
+                                    LOG.error("Error in getting secure password ", e);
+                                }
+                            }
+                        }
                         optionVal = surroundWithQuotes(optionVal);
 
                         optionStringBuffer.append(String.format(" %s=%s", key.substring(optionPrefixLen), optionVal));
diff --git a/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java b/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java
index 562e28a..9b4f093 100644
--- a/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java
+++ b/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java
@@ -20,13 +20,20 @@
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.alias.CredentialProvider;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+import org.apache.hadoop.security.alias.JavaKeyStoreProvider;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.security.JaasContext;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
+
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -37,6 +44,11 @@
 
 public class KafkaUtilsTest {
 
+    protected Path jksPath;
+    protected String providerUrl;
+
+    protected static final String JAAS_MASKED_PASSWORD = "keypass";
+
     @Test
     public void testSetKafkaJAASPropertiesForAllProperValues() {
         Properties properties = new Properties();
@@ -262,6 +274,94 @@
         }
     }
 
+    @Test
+    public void testSetKafkaJAASPropertiesForClearTextPassword() throws Exception {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+        setupCredentials();
+        final String loginModuleName = "org.apache.kafka.common.security.scram.ScramLoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "false";
+        final String optionServiceName = "kafka";
+        final String optionTokenAuth = "true";
+        final String optionUsername = "30CQ4q1hQMy0dB6X0eXfxQ";
+        final String optionPassword = "admin123";
+
+        configuration.setProperty("atlas.kafka.bootstrap.servers", "localhost:9100");
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName", optionServiceName);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.tokenauth", optionTokenAuth);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.username", optionUsername);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.password", optionPassword);
+        configuration.setProperty("hadoop.security.credential.provider.path", providerUrl);
+
+        try (MockedStatic mockedKafkaUtilsClass = Mockito.mockStatic(KafkaUtils.class)) {
+            mockedKafkaUtilsClass.when(() -> KafkaUtils.surroundWithQuotes(Mockito.anyString())).thenCallRealMethod();
+            mockedKafkaUtilsClass.when(() -> KafkaUtils.setKafkaJAASProperties(configuration, properties)).thenCallRealMethod();
+
+            KafkaUtils.setKafkaJAASProperties(configuration, properties);
+
+            String newPropertyValue = properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+            assertTrue(newPropertyValue.contains(loginModuleControlFlag), "loginModuleControlFlag not present in new property");
+            assertTrue(newPropertyValue.contains("useKeyTab=\"" + optionUseKeyTab + "\""), "useKeyTab not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("storeKey=\"" + optionStoreKey + "\""), "storeKey not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("serviceName=\"" + optionServiceName + "\""), "serviceName not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("tokenauth=\"" + optionTokenAuth + "\""), "tokenauth not pres////.ent in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("username=\"" + optionUsername + "\""), "username not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("password=\"" + optionPassword + "\""), "password not present in new property or value doesn't match");
+            assertJaaSConfigLoadable(newPropertyValue);
+        }
+    }
+
+    @Test
+    public void testSetKafkaJAASPropertiesForPasswordEncryption() throws Exception {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+        setupCredentials();
+        final String loginModuleName = "org.apache.kafka.common.security.scram.ScramLoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+         final String optionStoreKey = "false";
+        final String optionServiceName = "kafka";
+        final String optionTokenAuth = "true";
+        final String optionUsername = "30CQ4q1hQMy0dB6X0eXfxQ";
+        final String optionPassword = "********";
+
+        configuration.setProperty("atlas.kafka.bootstrap.servers", "localhost:9100");
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName", optionServiceName);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.tokenauth", optionTokenAuth);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.username", optionUsername);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.password", optionPassword);
+        configuration.setProperty("hadoop.security.credential.provider.path", providerUrl);
+
+        try (MockedStatic mockedKafkaUtilsClass = Mockito.mockStatic(KafkaUtils.class)) {
+            mockedKafkaUtilsClass.when(() -> KafkaUtils.surroundWithQuotes(Mockito.anyString())).thenCallRealMethod();
+            mockedKafkaUtilsClass.when(() -> KafkaUtils.setKafkaJAASProperties(configuration, properties)).thenCallRealMethod();
+
+            KafkaUtils.setKafkaJAASProperties(configuration, properties);
+
+            String newPropertyValue = properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+            assertTrue(newPropertyValue.contains(loginModuleControlFlag), "loginModuleControlFlag not present in new property");
+            assertTrue(newPropertyValue.contains("useKeyTab=\"" + optionUseKeyTab + "\""), "useKeyTab not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("storeKey=\"" + optionStoreKey + "\""), "storeKey not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("serviceName=\"" + optionServiceName + "\""), "serviceName not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("tokenauth=\"" + optionTokenAuth + "\""), "tokenauth not pres////.ent in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("username=\"" + optionUsername + "\""), "username not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("password=\"" + JAAS_MASKED_PASSWORD + "\""), "password not present in new property or value doesn't match");
+            assertJaaSConfigLoadable(newPropertyValue);
+        }
+    }
+
     private void assertJaaSConfigLoadable(String jaasConfig) {
         // Ensure that JaaS config can be loaded
         Map<String, Password> jaasConfigs = new HashMap<>();
@@ -273,6 +373,27 @@
         }
     }
 
+    protected void setupCredentials() throws Exception {
+        jksPath = new Path(Files.createTempDirectory("tempproviders").toString(), "kafka.jceks");
+        providerUrl = JavaKeyStoreProvider.SCHEME_NAME + "://file/" + jksPath.toUri();
+        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(false);
+
+        conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, providerUrl);
+
+        CredentialProvider provider = CredentialProviderFactory.getProviders(conf).get(0);
+
+        // create new aliases
+        try {
+            provider.createCredentialEntry("atlas.jaas.KafkaClient.option.password", JAAS_MASKED_PASSWORD.toCharArray());
+
+
+            // write out so that it can be found in checks
+            provider.flush();
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
 
 }
-