blob: 48141a8b3a5e4b4e1a9436585aa95a7f450c69d0 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.dbcp;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
* Implementation of for Database Connection Pooling Service. Apache DBCP is used for connection pooling functionality.
@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
@CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage.")
@DynamicProperty(name = "JDBC property name",
value = "JDBC property value",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
description = "JDBC driver property name and value applied to JDBC connections."),
@DynamicProperty(name = "SENSITIVE.JDBC property name",
value = "JDBC property value",
expressionLanguageScope = ExpressionLanguageScope.NONE,
description = "JDBC driver property name prefixed with 'SENSITIVE.' handled as a sensitive property.")
public class DBCPConnectionPool extends AbstractControllerService implements DBCPService {
/** Property Name Prefix for Sensitive Dynamic Properties */
protected static final String SENSITIVE_PROPERTY_PREFIX = "SENSITIVE.";
* Copied from {@link GenericObjectPoolConfig.DEFAULT_MIN_IDLE} in Commons-DBCP 2.7.0
private static final String DEFAULT_MIN_IDLE = "0";
* Copied from {@link GenericObjectPoolConfig.DEFAULT_MAX_IDLE} in Commons-DBCP 2.7.0
private static final String DEFAULT_MAX_IDLE = "8";
* Copied from private variable {@link BasicDataSource.maxConnLifetimeMillis} in Commons-DBCP 2.7.0
private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
* Copied from {@link GenericObjectPoolConfig.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS} in Commons-DBCP 2.7.0
private static final String DEFAULT_EVICTION_RUN_PERIOD = String.valueOf(-1L);
* Copied from {@link GenericObjectPoolConfig.DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.7.0
* and converted from 1800000L to "1800000 millis" to "30 mins"
private static final String DEFAULT_MIN_EVICTABLE_IDLE_TIME = "30 mins";
* Copied from {@link GenericObjectPoolConfig.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.7.0
private static final String DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME = String.valueOf(-1L);
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("Database Connection URL")
.description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters."
+ " The exact syntax of a database connection URL is specified by your DBMS.")
public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder()
.name("Database Driver Class Name")
.description("Database driver class name")
public static final PropertyDescriptor DB_DRIVER_LOCATION = new PropertyDescriptor.Builder()
.displayName("Database Driver Location(s)")
.description("Comma-separated list of files/folders and/or URLs containing the driver JAR and its dependencies (if any). For example '/var/tmp/mariadb-java-client-1.1.7.jar'")
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL)
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
.name("Database User")
.description("Database user name")
public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
.description("The password for the database user")
public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
.name("Max Wait Time")
.description("The maximum amount of time that the pool will wait (when there are no available connections) "
+ " for a connection to be returned before failing, or -1 to wait indefinitely. ")
.defaultValue("500 millis")
public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
.name("Max Total Connections")
.description("The maximum number of active connections that can be allocated from this pool at the same time, "
+ " or negative for no limit.")
public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
.displayName("Validation query")
.description("Validation query used to validate connections before returning them. "
+ "When connection is invalid, it get's dropped and new valid connection will be returned. "
+ "Note!! Using validation might have some performance penalty.")
public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder()
.displayName("Minimum Idle Connections")
.description("The minimum number of connections that can remain idle in the pool, without extra ones being " +
"created, or zero to create none.")
public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder()
.displayName("Max Idle Connections")
.description("The maximum number of connections that can remain idle in the pool, without extra ones being " +
"released, or negative for no limit.")
public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
.displayName("Max Connection Lifetime")
.description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " +
"connection will fail the next activation, passivation or validation test. A value of zero or less " +
"means the connection has an infinite lifetime.")
public static final PropertyDescriptor EVICTION_RUN_PERIOD = new PropertyDescriptor.Builder()
.displayName("Time Between Eviction Runs")
.description("The number of milliseconds to sleep between runs of the idle connection evictor thread. When " +
"non-positive, no idle connection evictor thread will be run.")
public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
.displayName("Minimum Evictable Idle Time")
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for eviction.")
public static final PropertyDescriptor SOFT_MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
.displayName("Soft Minimum Evictable Idle Time")
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for " +
"eviction by the idle connection evictor, with the extra condition that at least a minimum number of" +
" idle connections remain in the pool. When the not-soft version of this option is set to a positive" +
" value, it is examined first by the idle connection evictor: when idle connections are visited by " +
"the evictor, idle time is first compared against it (without considering the number of idle " +
"connections in the pool) and then against this soft option, including the minimum idle connections " +
public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
.displayName("Kerberos User Service")
.description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
.displayName("Kerberos Principal")
.description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
public static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
.displayName("Kerberos Password")
.description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
private static final List<PropertyDescriptor> properties;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
properties = Collections.unmodifiableList(props);
private volatile BasicDataSource dataSource;
private volatile KerberosUser kerberosUser;
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
final PropertyDescriptor.Builder builder = new PropertyDescriptor.Builder()
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
if (propertyDescriptorName.startsWith(SENSITIVE_PROPERTY_PREFIX)) {
} else {
protected Collection<ValidationResult> customValidate(ValidationContext context) {
final List<ValidationResult> results = new ArrayList<>();
final boolean kerberosPrincipalProvided = !StringUtils.isBlank(context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue());
final boolean kerberosPasswordProvided = !StringUtils.isBlank(context.getProperty(KERBEROS_PASSWORD).getValue());
if (kerberosPrincipalProvided && !kerberosPasswordProvided) {
results.add(new ValidationResult.Builder()
.explanation("a password must be provided for the given principal")
if (kerberosPasswordProvided && !kerberosPrincipalProvided) {
results.add(new ValidationResult.Builder()
.explanation("a principal must be provided for the given password")
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
if (kerberosCredentialsService != null && (kerberosPrincipalProvided || kerberosPasswordProvided)) {
results.add(new ValidationResult.Builder()
.explanation("kerberos principal/password and kerberos credential service cannot be configured at the same time")
if (kerberosUserService != null && (kerberosPrincipalProvided || kerberosPasswordProvided)) {
results.add(new ValidationResult.Builder()
.explanation("kerberos principal/password and kerberos user service cannot be configured at the same time")
if (kerberosUserService != null && kerberosCredentialsService != null) {
results.add(new ValidationResult.Builder()
.explanation("kerberos user service and kerberos credential service cannot be configured at the same time")
return results;
* Configures connection pool by creating an instance of the
* {@link BasicDataSource} based on configuration provided with
* {@link ConfigurationContext}.
* This operation makes no guarantees that the actual connection could be
* made since the underlying system may still go off-line during normal
* operation of the connection pool.
* @param context
* the configuration context
* @throws InitializationException
* if unable to create a database connection
public void onConfigured(final ConfigurationContext context) throws InitializationException {
final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final String dburl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger();
final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions());
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
if (kerberosUserService != null) {
kerberosUser = kerberosUserService.createKerberosUser();
} else if (kerberosCredentialsService != null) {
kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
kerberosUser = new KerberosPasswordUser(kerberosPrincipal, kerberosPassword);
if (kerberosUser != null) {
try {
} catch (KerberosLoginException e) {
throw new InitializationException("Unable to authenticate Kerberos principal", e);
dataSource = new BasicDataSource();
dataSource.setDriver(getDriver(driverName, dburl));
if (validationQuery!=null && !validationQuery.isEmpty()) {
final List<PropertyDescriptor> dynamicProperties = context.getProperties()
dynamicProperties.forEach((descriptor) -> {
final PropertyValue propertyValue = context.getProperty(descriptor);
if (descriptor.isSensitive()) {
final String propertyName = StringUtils.substringAfter(descriptor.getName(), SENSITIVE_PROPERTY_PREFIX);
dataSource.addConnectionProperty(propertyName, propertyValue.getValue());
} else {
dataSource.addConnectionProperty(descriptor.getName(), propertyValue.evaluateAttributeExpressions().getValue());
private Driver getDriver(final String driverName, final String url) {
final Class<?> clazz;
try {
clazz = Class.forName(driverName);
} catch (final ClassNotFoundException e) {
throw new ProcessException("Driver class " + driverName + " is not found", e);
try {
return DriverManager.getDriver(url);
} catch (final SQLException e) {
// In case the driver is not registered by the implementation, we explicitly try to register it.
try {
final Driver driver = (Driver) clazz.newInstance();
return DriverManager.getDriver(url);
} catch (final SQLException e2) {
throw new ProcessException("No suitable driver for the given Database Connection URL", e2);
} catch (final IllegalAccessException | InstantiationException e2) {
throw new ProcessException("Creating driver instance is failed", e2);
private Long extractMillisWithInfinite(PropertyValue prop) {
return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS);
* Shutdown pool, close all open connections.
* If a principal is authenticated with a KDC, that principal is logged out.
* If a @{@link LoginException} occurs while attempting to log out the @{@link},
* an attempt will still be made to shut down the pool and close open connections.
* @throws SQLException if there is an error while closing open connections
* @throws LoginException if there is an error during the principal log out, and will only be thrown if there was
* no exception while closing open connections
public void shutdown() throws SQLException {
try {
if (kerberosUser != null) {
} finally {
kerberosUser = null;
try {
if (dataSource != null) {
} finally {
dataSource = null;
public Connection getConnection() throws ProcessException {
try {
final Connection con;
if (kerberosUser != null) {
KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger());
con = kerberosAction.execute();
} else {
con = dataSource.getConnection();
return con;
} catch (final SQLException e) {
// If using Kerberos, attempt to re-login
if (kerberosUser != null) {
try {
getLogger().info("Error getting connection, performing Kerberos re-login");
} catch (KerberosLoginException le) {
throw new ProcessException("Unable to authenticate Kerberos principal", le);
throw new ProcessException(e);
public String toString() {
return "DBCPConnectionPool[id=" + getIdentifier() + "]";
BasicDataSource getDataSource() {
return dataSource;