NIFI-8955 Add Max Connection Lifetime property to Hive(_1_1)ConnectionPool CS
This closes #5259.
Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
index a762844..777a5ab 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
@@ -128,6 +128,11 @@
<version>1.8</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-dbcp2</artifactId>
+ <version>2.7.0</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.15.0-SNAPSHOT</version>
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
index 4454386..959c31b 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.dbcp.hive;
-import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.jdbc.HiveDriver;
@@ -26,6 +26,7 @@
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
@@ -33,6 +34,7 @@
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.dbcp.DBCPValidator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
@@ -72,6 +74,8 @@
public class HiveConnectionPool extends AbstractControllerService implements HiveDBCPService {
private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
+ private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
+
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("hive-db-connect-url")
.displayName("Database Connection URL")
@@ -137,6 +141,18 @@
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
+ .displayName("Max Connection Lifetime")
+ .name("hive-max-conn-lifetime")
+ .description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " +
+ "connection pool will invalidate the connection. A value of zero or -1 " +
+ "means the connection has an infinite lifetime.")
+ .defaultValue(DEFAULT_MAX_CONN_LIFETIME)
+ .required(true)
+ .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
.name("Validation-query")
.displayName("Validation query")
@@ -181,6 +197,7 @@
props.add(DB_PASSWORD);
props.add(MAX_WAIT_TIME);
props.add(MAX_TOTAL_CONNECTIONS);
+ props.add(MAX_CONN_LIFETIME);
props.add(VALIDATION_QUERY);
props.add(KERBEROS_CREDENTIALS_SERVICE);
@@ -335,14 +352,16 @@
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
+ final long maxConnectionLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
dataSource = new BasicDataSource();
dataSource.setDriverClassName(drv);
connectionUrl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
- dataSource.setMaxWait(maxWaitMillis);
- dataSource.setMaxActive(maxTotal);
+ dataSource.setMaxWaitMillis(maxWaitMillis);
+ dataSource.setMaxTotal(maxTotal);
+ dataSource.setMaxConnLifetimeMillis(maxConnectionLifetimeMillis);
if (validationQuery != null && !validationQuery.isEmpty()) {
dataSource.setValidationQuery(validationQuery);
@@ -426,4 +445,13 @@
boolean isAllowExplicitKeytab() {
return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
}
+
+ private long extractMillisWithInfinite(PropertyValue prop) {
+ if (prop.getValue() == null || DEFAULT_MAX_CONN_LIFETIME.equals(prop.getValue())) {
+ return -1;
+ } else {
+ return prop.asTimePeriod(TimeUnit.MILLISECONDS);
+ }
+ }
+
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java
index b109740..be8c2c5 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java
@@ -31,7 +31,7 @@
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
@@ -131,6 +131,7 @@
final String USER = "user";
final String PASS = "pass";
final int MAX_CONN = 7;
+ final String MAX_CONN_LIFETIME = "1 sec";
final String MAX_WAIT = "10 sec"; // 10000 milliseconds
final String CONF = "/path/to/hive-site.xml";
hiveConnectionPool = new HiveConnectionPool();
@@ -140,6 +141,7 @@
put(HiveConnectionPool.DB_USER, "${username}");
put(HiveConnectionPool.DB_PASSWORD, "${password}");
put(HiveConnectionPool.MAX_TOTAL_CONNECTIONS, "${maxconn}");
+ put(HiveConnectionPool.MAX_CONN_LIFETIME, "${maxconnlifetime}");
put(HiveConnectionPool.MAX_WAIT_TIME, "${maxwait}");
put(HiveConnectionPool.HIVE_CONFIGURATION_RESOURCES, "${hiveconf}");
}};
@@ -149,6 +151,7 @@
registry.setVariable(new VariableDescriptor("username"), USER);
registry.setVariable(new VariableDescriptor("password"), PASS);
registry.setVariable(new VariableDescriptor("maxconn"), Integer.toString(MAX_CONN));
+ registry.setVariable(new VariableDescriptor("maxconnlifetime"), MAX_CONN_LIFETIME);
registry.setVariable(new VariableDescriptor("maxwait"), MAX_WAIT);
registry.setVariable(new VariableDescriptor("hiveconf"), CONF);
@@ -162,8 +165,9 @@
assertEquals(URL, basicDataSource.getUrl());
assertEquals(USER, basicDataSource.getUsername());
assertEquals(PASS, basicDataSource.getPassword());
- assertEquals(MAX_CONN, basicDataSource.getMaxActive());
- assertEquals(10000L, basicDataSource.getMaxWait());
+ assertEquals(MAX_CONN, basicDataSource.getMaxTotal());
+ assertEquals(1000L, basicDataSource.getMaxConnLifetimeMillis());
+ assertEquals(10000L, basicDataSource.getMaxWaitMillis());
assertEquals(URL, hiveConnectionPool.getConnectionURL());
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/pom.xml
index 3fdaed2..d00eb17 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/pom.xml
@@ -132,6 +132,11 @@
<version>1.8</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-dbcp2</artifactId>
+ <version>2.7.0</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.15.0-SNAPSHOT</version>
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java
index 2b47335..ebd0942 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.dbcp.hive;
-import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.jdbc.HiveDriver;
@@ -26,6 +26,7 @@
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
@@ -33,6 +34,7 @@
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.dbcp.DBCPValidator;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.SecurityUtil;
@@ -69,6 +71,9 @@
@Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
@CapabilityDescription("Provides Database Connection Pooling Service for Apache Hive 1.1.x. Connections can be asked from pool and returned after usage.")
public class Hive_1_1ConnectionPool extends AbstractControllerService implements Hive_1_1DBCPService {
+
+ private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
+
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("hive-db-connect-url")
.displayName("Database Connection URL")
@@ -134,6 +139,18 @@
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
+ .displayName("Max Connection Lifetime")
+ .name("hive-max-conn-lifetime")
+ .description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " +
+ "connection pool will invalidate the connection. A value of zero or -1 " +
+ "means the connection has an infinite lifetime.")
+ .defaultValue(DEFAULT_MAX_CONN_LIFETIME)
+ .required(true)
+ .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
.name("Validation-query")
.displayName("Validation query")
@@ -195,6 +212,7 @@
props.add(DB_PASSWORD);
props.add(MAX_WAIT_TIME);
props.add(MAX_TOTAL_CONNECTIONS);
+ props.add(MAX_CONN_LIFETIME);
props.add(VALIDATION_QUERY);
props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(KERBEROS_PRINCIPAL);
@@ -334,14 +352,16 @@
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
+ final long maxConnectionLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
dataSource = new BasicDataSource();
dataSource.setDriverClassName(drv);
connectionUrl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
- dataSource.setMaxWait(maxWaitMillis);
- dataSource.setMaxActive(maxTotal);
+ dataSource.setMaxWaitMillis(maxWaitMillis);
+ dataSource.setMaxTotal(maxTotal);
+ dataSource.setMaxConnLifetimeMillis(maxConnectionLifetimeMillis);
if (validationQuery != null && !validationQuery.isEmpty()) {
dataSource.setValidationQuery(validationQuery);
@@ -421,4 +441,11 @@
return connectionUrl;
}
+ private long extractMillisWithInfinite(PropertyValue prop) {
+ if (prop.getValue() == null || DEFAULT_MAX_CONN_LIFETIME.equals(prop.getValue())) {
+ return -1;
+ } else {
+ return prop.asTimePeriod(TimeUnit.MILLISECONDS);
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPoolTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPoolTest.java
index d129084..5881dba 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPoolTest.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPoolTest.java
@@ -32,7 +32,7 @@
import java.util.Map;
import java.util.UUID;
-import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
@@ -113,6 +113,7 @@
final String USER = "user";
final String PASS = "pass";
final int MAX_CONN = 7;
+ final String MAX_CONN_LIFETIME = "1 sec";
final String MAX_WAIT = "10 sec"; // 10000 milliseconds
final String CONF = "/path/to/hive-site.xml";
hiveConnectionPool = new Hive_1_1ConnectionPool();
@@ -122,6 +123,7 @@
put(Hive_1_1ConnectionPool.DB_USER, "${username}");
put(Hive_1_1ConnectionPool.DB_PASSWORD, "${password}");
put(Hive_1_1ConnectionPool.MAX_TOTAL_CONNECTIONS, "${maxconn}");
+ put(Hive_1_1ConnectionPool.MAX_CONN_LIFETIME, "${maxconnlifetime}");
put(Hive_1_1ConnectionPool.MAX_WAIT_TIME, "${maxwait}");
put(Hive_1_1ConnectionPool.HIVE_CONFIGURATION_RESOURCES, "${hiveconf}");
}};
@@ -131,6 +133,7 @@
registry.setVariable(new VariableDescriptor("username"), USER);
registry.setVariable(new VariableDescriptor("password"), PASS);
registry.setVariable(new VariableDescriptor("maxconn"), Integer.toString(MAX_CONN));
+ registry.setVariable(new VariableDescriptor("maxconnlifetime"), MAX_CONN_LIFETIME);
registry.setVariable(new VariableDescriptor("maxwait"), MAX_WAIT);
registry.setVariable(new VariableDescriptor("hiveconf"), CONF);
@@ -144,8 +147,9 @@
assertEquals(URL, basicDataSource.getUrl());
assertEquals(USER, basicDataSource.getUsername());
assertEquals(PASS, basicDataSource.getPassword());
- assertEquals(MAX_CONN, basicDataSource.getMaxActive());
- assertEquals(10000L, basicDataSource.getMaxWait());
+ assertEquals(MAX_CONN, basicDataSource.getMaxTotal());
+ assertEquals(1000L, basicDataSource.getMaxConnLifetimeMillis());
+ assertEquals(10000L, basicDataSource.getMaxWaitMillis());
assertEquals(URL, hiveConnectionPool.getConnectionURL());
}