NIFI-8978 Add KerberosUserService to DBCPConnectionPool/HadoopDBCPConnectionPool
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
This closes #5399.
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
index 04ebde4..9189bf4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
@@ -58,6 +58,10 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kerberos-user-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-sink-api</artifactId>
<version>1.15.0-SNAPSHOT</version>
</dependency>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
index 945412d..48141a8 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
@@ -37,6 +37,7 @@
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
@@ -265,6 +266,14 @@
.required(false)
.build();
+ public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
+ .name("kerberos-user-service")
+ .displayName("Kerberos User Service")
+ .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
+ .identifiesControllerService(KerberosUserService.class)
+ .required(false)
+ .build();
+
public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
.name("kerberos-principal")
.displayName("Kerberos Principal")
@@ -291,6 +300,7 @@
props.add(DATABASE_URL);
props.add(DB_DRIVERNAME);
props.add(DB_DRIVER_LOCATION);
+ props.add(KERBEROS_USER_SERVICE);
props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(KERBEROS_PRINCIPAL);
props.add(KERBEROS_PASSWORD);
@@ -359,6 +369,7 @@
}
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+ final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
if (kerberosCredentialsService != null && (kerberosPrincipalProvided || kerberosPasswordProvided)) {
results.add(new ValidationResult.Builder()
@@ -368,6 +379,22 @@
.build());
}
+ if (kerberosUserService != null && (kerberosPrincipalProvided || kerberosPasswordProvided)) {
+ results.add(new ValidationResult.Builder()
+ .subject(KERBEROS_USER_SERVICE.getDisplayName())
+ .valid(false)
+ .explanation("kerberos principal/password and kerberos user service cannot be configured at the same time")
+ .build());
+ }
+
+ if (kerberosUserService != null && kerberosCredentialsService != null) {
+ results.add(new ValidationResult.Builder()
+ .subject(KERBEROS_USER_SERVICE.getDisplayName())
+ .valid(false)
+ .explanation("kerberos user service and kerberos credential service cannot be configured at the same time")
+ .build());
+ }
+
return results;
}
@@ -402,10 +429,13 @@
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+ final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
- if (kerberosCredentialsService != null) {
+ if (kerberosUserService != null) {
+ kerberosUser = kerberosUserService.createKerberosUser();
+ } else if (kerberosCredentialsService != null) {
kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
kerberosUser = new KerberosPasswordUser(kerberosPrincipal, kerberosPassword);
@@ -497,7 +527,7 @@
* no exception while closing open connections
*/
@OnDisabled
- public void shutdown() throws SQLException, LoginException {
+ public void shutdown() throws SQLException {
try {
if (kerberosUser != null) {
kerberosUser.logout();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy
index 0ca1191..d53f8e3 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy
@@ -59,6 +59,7 @@
import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE
import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PASSWORD
import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PRINCIPAL
+import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_USER_SERVICE
import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_CONN_LIFETIME
import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_IDLE
import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_TOTAL_CONNECTIONS
@@ -306,6 +307,7 @@
when(dbContext.getProperty(MIN_EVICTABLE_IDLE_TIME)).thenReturn(new MockPropertyValue('5 sec'))
when(dbContext.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME)).thenReturn(new MockPropertyValue('5 sec'))
when(dbContext.getProperty(KERBEROS_CREDENTIALS_SERVICE)).thenReturn(new MockPropertyValue(null))
+ when(dbContext.getProperty(KERBEROS_USER_SERVICE)).thenReturn(new MockPropertyValue(null))
when(dbContext.getProperty(KERBEROS_PRINCIPAL)).thenReturn(new MockPropertyValue(null))
when(dbContext.getProperty(KERBEROS_PASSWORD)).thenReturn(new MockPropertyValue(null))
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java
index a9f6ef7..7042a96 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java
@@ -17,6 +17,7 @@
package org.apache.nifi.dbcp;
import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.kerberos.MockKerberosCredentialsService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
@@ -39,6 +40,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class DBCPServiceTest {
private static final String SERVICE_ID = DBCPConnectionPool.class.getName();
@@ -70,6 +73,38 @@
}
@Test
+ public void testCustomValidateOfKerberosProperties() throws InitializationException {
+ // direct principal + password and no kerberos services is valid
+ runner.setProperty(service, DBCPConnectionPool.KERBEROS_PRINCIPAL, "foo@FOO.COM");
+ runner.setProperty(service, DBCPConnectionPool.KERBEROS_PASSWORD, "fooPassword");
+ runner.assertValid(service);
+
+ // direct principal + password with kerberos credential service is invalid
+ final KerberosCredentialsService kerberosCredentialsService = enabledKerberosCredentialsService(runner);
+ runner.setProperty(service, DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE, kerberosCredentialsService.getIdentifier());
+ runner.assertNotValid(service);
+
+ // kerberos credential service by itself is valid
+ runner.removeProperty(service, DBCPConnectionPool.KERBEROS_PRINCIPAL);
+ runner.removeProperty(service, DBCPConnectionPool.KERBEROS_PASSWORD);
+ runner.assertValid(service);
+
+ // kerberos credential service with kerberos user service is invalid
+ final KerberosUserService kerberosUserService = enableKerberosUserService(runner);
+ runner.setProperty(service, DBCPConnectionPool.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
+ runner.assertNotValid(service);
+
+ // kerberos user service by itself is valid
+ runner.removeProperty(service, DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE);
+ runner.assertValid(service);
+
+ // kerberos user service with direct principal + password is invalid
+ runner.setProperty(service, DBCPConnectionPool.KERBEROS_PRINCIPAL, "foo@FOO.COM");
+ runner.setProperty(service, DBCPConnectionPool.KERBEROS_PASSWORD, "fooPassword");
+ runner.assertNotValid(service);
+ }
+
+ @Test
public void testNotValidWithNegativeMinIdleProperty() {
runner.setProperty(service, DBCPConnectionPool.MIN_IDLE, "-1");
runner.assertNotValid(service);
@@ -249,4 +284,24 @@
assertNotNull(connection);
}
}
+
+ private KerberosUserService enableKerberosUserService(final TestRunner runner) throws InitializationException {
+ final KerberosUserService kerberosUserService = mock(KerberosUserService.class);
+ when(kerberosUserService.getIdentifier()).thenReturn("userService1");
+ runner.addControllerService(kerberosUserService.getIdentifier(), kerberosUserService);
+ runner.enableControllerService(kerberosUserService);
+ return kerberosUserService;
+ }
+
+ private KerberosCredentialsService enabledKerberosCredentialsService(final TestRunner runner) throws InitializationException {
+ final KerberosCredentialsService credentialsService = mock(KerberosCredentialsService.class);
+ when(credentialsService.getIdentifier()).thenReturn("credsService1");
+ when(credentialsService.getPrincipal()).thenReturn("principal1");
+ when(credentialsService.getKeytab()).thenReturn("keytab1");
+
+ runner.addControllerService(credentialsService.getIdentifier(), credentialsService);
+ runner.enableControllerService(credentialsService);
+ return credentialsService;
+ }
+
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml
index 0727b95..f4ecfb2 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml
@@ -55,8 +55,10 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
- <version>1.15.0-SNAPSHOT</version>
- <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kerberos-user-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java
index 614d5f8..eb97cf5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java
@@ -40,6 +40,7 @@
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
@@ -263,6 +264,14 @@
.required(false)
.build();
+ public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
+ .name("kerberos-user-service")
+ .displayName("Kerberos User Service")
+ .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
+ .identifiesControllerService(KerberosUserService.class)
+ .required(false)
+ .build();
+
private File kerberosConfigFile = null;
private KerberosProperties kerberosProperties;
@@ -286,6 +295,7 @@
props.add(DB_DRIVERNAME);
props.add(DB_DRIVER_LOCATION);
props.add(HADOOP_CONFIGURATION_RESOURCES);
+ props.add(KERBEROS_USER_SERVICE);
props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(kerberosProperties.getKerberosPrincipal());
props.add(kerberosProperties.getKerberosKeytab());
@@ -357,6 +367,7 @@
final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+ final KerberosUserService kerberosUserService = validationContext.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
final String resolvedPrincipal;
final String resolvedKeytab;
@@ -382,9 +393,15 @@
}
final Configuration hadoopConfig = resources.getConfiguration();
-
- problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(getClass().getSimpleName(), hadoopConfig,
- resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
+ if (kerberosUserService == null) {
+ problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(getClass().getSimpleName(), hadoopConfig,
+ resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
+ } else {
+ final boolean securityEnabled = SecurityUtil.isSecurityEnabled(hadoopConfig);
+ if (!securityEnabled) {
+ getLogger().warn("Hadoop Configuration does not have security enabled, KerberosUserService will be ignored");
+ }
+ }
}
if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) {
@@ -395,6 +412,22 @@
.build());
}
+ if (kerberosUserService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) {
+ problems.add(new ValidationResult.Builder()
+ .subject("Kerberos User")
+ .valid(false)
+ .explanation("Cannot specify a Kerberos User Service while also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password")
+ .build());
+ }
+
+ if (kerberosUserService != null && credentialsService != null) {
+ problems.add(new ValidationResult.Builder()
+ .subject("Kerberos User")
+ .valid(false)
+ .explanation("Cannot specify a Kerberos User Service while also specifying a Kerberos Credentials Service")
+ .build());
+ }
+
if (!isAllowExplicitKeytab() && explicitKeytab != null) {
problems.add(new ValidationResult.Builder()
.subject("Kerberos Credentials")
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java
index 644c669..0ca543a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java
@@ -20,6 +20,7 @@
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.kerberos.KerberosContext;
import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockKerberosContext;
@@ -30,6 +31,9 @@
import java.io.File;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class HadoopDBCPConnectionPoolTest {
private File krbConfFile;
@@ -75,7 +79,7 @@
runner.removeProperty(hadoopDBCPService, kerberosProps.getKerberosKeytab());
runner.assertValid(hadoopDBCPService);
- // Configure a KeberosCredentialService, should become invalid
+ // Configure a KerberosCredentialService, should become invalid
final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService(
"nifi@EXAMPLE.COM", "src/test/resources/fake.keytab");
runner.addControllerService("kerb-credentials", kerberosCredentialsService);
@@ -90,6 +94,32 @@
// Remove principal property, only using keytab service, should become valid
runner.removeProperty(hadoopDBCPService, kerberosProps.getKerberosPrincipal());
runner.assertValid(hadoopDBCPService);
+
+ // Configure KerberosUserService, should be invalid since KerberosCredentialService also configured
+ final KerberosUserService kerberosUserService = mock(KerberosUserService.class);
+ when(kerberosUserService.getIdentifier()).thenReturn("userService1");
+ runner.addControllerService(kerberosUserService.getIdentifier(), kerberosUserService);
+ runner.enableControllerService(kerberosUserService);
+ runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
+ runner.assertNotValid(hadoopDBCPService);
+
+ // Remove KerberosCredentialService, should be valid with only KerberosUserService
+ runner.removeProperty(hadoopDBCPService, HadoopDBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE);
+ runner.assertValid(hadoopDBCPService);
+
+ // Configure explicit principal and keytab, should be invalid while kerberos user service is set
+ runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosPrincipal(), "nifi@EXAMPLE.COM");
+ runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosKeytab(), "src/test/resources/fake.keytab");
+ runner.assertNotValid(hadoopDBCPService);
+
+ // Remove explicit keytab, set explicit password, still invalid while kerberos user service set
+ runner.removeProperty(hadoopDBCPService, kerberosProps.getKerberosKeytab());
+ runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosPassword(), "password");
+ runner.assertNotValid(hadoopDBCPService);
+
+ // Remove kerberos user service, should be valid
+ runner.removeProperty(hadoopDBCPService, HadoopDBCPConnectionPool.KERBEROS_USER_SERVICE);
+ runner.assertValid(hadoopDBCPService);
}
@Test