NIFI-8978 Add KerberosUserService to DBCPConnectionPool/HadoopDBCPConnectionPool

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5399.
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
index 04ebde4..9189bf4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
@@ -58,6 +58,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-user-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-record-sink-api</artifactId>
             <version>1.15.0-SNAPSHOT</version>
         </dependency>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
index 945412d..48141a8 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
@@ -37,6 +37,7 @@
 import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
@@ -265,6 +266,14 @@
             .required(false)
             .build();
 
+    public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
+            .identifiesControllerService(KerberosUserService.class)
+            .required(false)
+            .build();
+
     public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
             .name("kerberos-principal")
             .displayName("Kerberos Principal")
@@ -291,6 +300,7 @@
         props.add(DATABASE_URL);
         props.add(DB_DRIVERNAME);
         props.add(DB_DRIVER_LOCATION);
+        props.add(KERBEROS_USER_SERVICE);
         props.add(KERBEROS_CREDENTIALS_SERVICE);
         props.add(KERBEROS_PRINCIPAL);
         props.add(KERBEROS_PASSWORD);
@@ -359,6 +369,7 @@
         }
 
         final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+        final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
 
         if (kerberosCredentialsService != null && (kerberosPrincipalProvided || kerberosPasswordProvided)) {
             results.add(new ValidationResult.Builder()
@@ -368,6 +379,22 @@
                     .build());
         }
 
+        if (kerberosUserService != null && (kerberosPrincipalProvided || kerberosPasswordProvided)) {
+            results.add(new ValidationResult.Builder()
+                    .subject(KERBEROS_USER_SERVICE.getDisplayName())
+                    .valid(false)
+                    .explanation("kerberos principal/password and kerberos user service cannot be configured at the same time")
+                    .build());
+        }
+
+        if (kerberosUserService != null && kerberosCredentialsService != null) {
+            results.add(new ValidationResult.Builder()
+                    .subject(KERBEROS_USER_SERVICE.getDisplayName())
+                    .valid(false)
+                    .explanation("kerberos user service and kerberos credential service cannot be configured at the same time")
+                    .build());
+        }
+
         return results;
     }
 
@@ -402,10 +429,13 @@
         final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
         final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
         final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+        final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
         final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
         final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
 
-        if (kerberosCredentialsService != null) {
+        if (kerberosUserService != null) {
+            kerberosUser = kerberosUserService.createKerberosUser();
+        } else if (kerberosCredentialsService != null) {
             kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
         } else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
             kerberosUser = new KerberosPasswordUser(kerberosPrincipal, kerberosPassword);
@@ -497,7 +527,7 @@
      * no exception while closing open connections
      */
     @OnDisabled
-    public void shutdown() throws SQLException, LoginException {
+    public void shutdown() throws SQLException {
         try {
             if (kerberosUser != null) {
                 kerberosUser.logout();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy
index 0ca1191..d53f8e3 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy
@@ -59,6 +59,7 @@
 import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE
 import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PASSWORD
 import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PRINCIPAL
+import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_USER_SERVICE
 import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_CONN_LIFETIME
 import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_IDLE
 import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_TOTAL_CONNECTIONS
@@ -306,6 +307,7 @@
         when(dbContext.getProperty(MIN_EVICTABLE_IDLE_TIME)).thenReturn(new MockPropertyValue('5 sec'))
         when(dbContext.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME)).thenReturn(new MockPropertyValue('5 sec'))
         when(dbContext.getProperty(KERBEROS_CREDENTIALS_SERVICE)).thenReturn(new MockPropertyValue(null))
+        when(dbContext.getProperty(KERBEROS_USER_SERVICE)).thenReturn(new MockPropertyValue(null))
         when(dbContext.getProperty(KERBEROS_PRINCIPAL)).thenReturn(new MockPropertyValue(null))
         when(dbContext.getProperty(KERBEROS_PASSWORD)).thenReturn(new MockPropertyValue(null))
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java
index a9f6ef7..7042a96 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.dbcp;
 
 import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
 import org.apache.nifi.kerberos.MockKerberosCredentialsService;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.reporting.InitializationException;
@@ -39,6 +40,8 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class DBCPServiceTest {
     private static final String SERVICE_ID = DBCPConnectionPool.class.getName();
@@ -70,6 +73,38 @@
     }
 
     @Test
+    public void testCustomValidateOfKerberosProperties() throws InitializationException {
+        // direct principal + password and no kerberos services is valid
+        runner.setProperty(service, DBCPConnectionPool.KERBEROS_PRINCIPAL, "foo@FOO.COM");
+        runner.setProperty(service, DBCPConnectionPool.KERBEROS_PASSWORD, "fooPassword");
+        runner.assertValid(service);
+
+        // direct principal + password with kerberos credential service is invalid
+        final KerberosCredentialsService kerberosCredentialsService = enabledKerberosCredentialsService(runner);
+        runner.setProperty(service, DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE, kerberosCredentialsService.getIdentifier());
+        runner.assertNotValid(service);
+
+        // kerberos credential service by itself is valid
+        runner.removeProperty(service, DBCPConnectionPool.KERBEROS_PRINCIPAL);
+        runner.removeProperty(service, DBCPConnectionPool.KERBEROS_PASSWORD);
+        runner.assertValid(service);
+
+        // kerberos credential service with kerberos user service is invalid
+        final KerberosUserService kerberosUserService = enableKerberosUserService(runner);
+        runner.setProperty(service, DBCPConnectionPool.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
+        runner.assertNotValid(service);
+
+        // kerberos user service by itself is valid
+        runner.removeProperty(service, DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE);
+        runner.assertValid(service);
+
+        // kerberos user service with direct principal + password is invalid
+        runner.setProperty(service, DBCPConnectionPool.KERBEROS_PRINCIPAL, "foo@FOO.COM");
+        runner.setProperty(service, DBCPConnectionPool.KERBEROS_PASSWORD, "fooPassword");
+        runner.assertNotValid(service);
+    }
+
+    @Test
     public void testNotValidWithNegativeMinIdleProperty() {
         runner.setProperty(service, DBCPConnectionPool.MIN_IDLE, "-1");
         runner.assertNotValid(service);
@@ -249,4 +284,24 @@
             assertNotNull(connection);
         }
     }
+
+    private KerberosUserService enableKerberosUserService(final TestRunner runner) throws InitializationException {
+        final KerberosUserService kerberosUserService = mock(KerberosUserService.class);
+        when(kerberosUserService.getIdentifier()).thenReturn("userService1");
+        runner.addControllerService(kerberosUserService.getIdentifier(), kerberosUserService);
+        runner.enableControllerService(kerberosUserService);
+        return kerberosUserService;
+    }
+
+    private KerberosCredentialsService enabledKerberosCredentialsService(final TestRunner runner) throws InitializationException {
+        final KerberosCredentialsService credentialsService = mock(KerberosCredentialsService.class);
+        when(credentialsService.getIdentifier()).thenReturn("credsService1");
+        when(credentialsService.getPrincipal()).thenReturn("principal1");
+        when(credentialsService.getKeytab()).thenReturn("keytab1");
+
+        runner.addControllerService(credentialsService.getIdentifier(), credentialsService);
+        runner.enableControllerService(credentialsService);
+        return credentialsService;
+    }
+
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml
index 0727b95..f4ecfb2 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml
@@ -55,8 +55,10 @@
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-kerberos-credentials-service-api</artifactId>
-            <version>1.15.0-SNAPSHOT</version>
-            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-user-service-api</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java
index 614d5f8..eb97cf5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java
@@ -40,6 +40,7 @@
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.hadoop.SecurityUtil;
 import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
@@ -263,6 +264,14 @@
             .required(false)
             .build();
 
+    public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
+            .identifiesControllerService(KerberosUserService.class)
+            .required(false)
+            .build();
+
 
     private File kerberosConfigFile = null;
     private KerberosProperties kerberosProperties;
@@ -286,6 +295,7 @@
         props.add(DB_DRIVERNAME);
         props.add(DB_DRIVER_LOCATION);
         props.add(HADOOP_CONFIGURATION_RESOURCES);
+        props.add(KERBEROS_USER_SERVICE);
         props.add(KERBEROS_CREDENTIALS_SERVICE);
         props.add(kerberosProperties.getKerberosPrincipal());
         props.add(kerberosProperties.getKerberosKeytab());
@@ -357,6 +367,7 @@
         final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
         final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
         final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+        final KerberosUserService kerberosUserService = validationContext.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
 
         final String resolvedPrincipal;
         final String resolvedKeytab;
@@ -382,9 +393,15 @@
             }
 
             final Configuration hadoopConfig = resources.getConfiguration();
-
-            problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(getClass().getSimpleName(), hadoopConfig,
-                    resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
+            if (kerberosUserService == null) {
+                problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(getClass().getSimpleName(), hadoopConfig,
+                        resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
+            } else {
+                final boolean securityEnabled = SecurityUtil.isSecurityEnabled(hadoopConfig);
+                if (!securityEnabled) {
+                    getLogger().warn("Hadoop Configuration does not have security enabled, KerberosUserService will be ignored");
+                }
+            }
         }
 
         if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) {
@@ -395,6 +412,22 @@
                     .build());
         }
 
+        if (kerberosUserService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) {
+            problems.add(new ValidationResult.Builder()
+                    .subject("Kerberos User")
+                    .valid(false)
+                    .explanation("Cannot specify a Kerberos User Service while also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password")
+                    .build());
+        }
+
+        if (kerberosUserService != null && credentialsService != null) {
+            problems.add(new ValidationResult.Builder()
+                    .subject("Kerberos User")
+                    .valid(false)
+                    .explanation("Cannot specify a Kerberos User Service while also specifying a Kerberos Credentials Service")
+                    .build());
+        }
+
         if (!isAllowExplicitKeytab() && explicitKeytab != null) {
             problems.add(new ValidationResult.Builder()
                     .subject("Kerberos Credentials")
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java
index 644c669..0ca543a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java
@@ -20,6 +20,7 @@
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.kerberos.KerberosContext;
 import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockKerberosContext;
@@ -30,6 +31,9 @@
 
 import java.io.File;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class HadoopDBCPConnectionPoolTest {
 
     private File krbConfFile;
@@ -75,7 +79,7 @@
         runner.removeProperty(hadoopDBCPService, kerberosProps.getKerberosKeytab());
         runner.assertValid(hadoopDBCPService);
 
-        // Configure a KeberosCredentialService, should become invalid
+        // Configure a KerberosCredentialService, should become invalid
         final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService(
                 "nifi@EXAMPLE.COM", "src/test/resources/fake.keytab");
         runner.addControllerService("kerb-credentials", kerberosCredentialsService);
@@ -90,6 +94,32 @@
         // Remove principal property, only using keytab service, should become valid
         runner.removeProperty(hadoopDBCPService, kerberosProps.getKerberosPrincipal());
         runner.assertValid(hadoopDBCPService);
+
+        // Configure KerberosUserService, should be invalid since KerberosCredentialService also configured
+        final KerberosUserService kerberosUserService = mock(KerberosUserService.class);
+        when(kerberosUserService.getIdentifier()).thenReturn("userService1");
+        runner.addControllerService(kerberosUserService.getIdentifier(), kerberosUserService);
+        runner.enableControllerService(kerberosUserService);
+        runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
+        runner.assertNotValid(hadoopDBCPService);
+
+        // Remove KerberosCredentialService, should be valid with only KerberosUserService
+        runner.removeProperty(hadoopDBCPService, HadoopDBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE);
+        runner.assertValid(hadoopDBCPService);
+
+        // Configure explicit principal and keytab, should be invalid while kerberos user service is set
+        runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosPrincipal(), "nifi@EXAMPLE.COM");
+        runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosKeytab(), "src/test/resources/fake.keytab");
+        runner.assertNotValid(hadoopDBCPService);
+
+        // Remove explicit keytab, set explicit password, still invalid while kerberos user service set
+        runner.removeProperty(hadoopDBCPService, kerberosProps.getKerberosKeytab());
+        runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosPassword(), "password");
+        runner.assertNotValid(hadoopDBCPService);
+
+        // Remove kerberos user service, should be valid
+        runner.removeProperty(hadoopDBCPService, HadoopDBCPConnectionPool.KERBEROS_USER_SERVICE);
+        runner.assertValid(hadoopDBCPService);
     }
 
     @Test