NIFI-8955 Add Max Connection Lifetime property to Hive(_1_1)ConnectionPool CS

This closes #5259.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
index a762844..777a5ab 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
@@ -128,6 +128,11 @@
             <version>1.8</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-dbcp2</artifactId>
+            <version>2.7.0</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
             <version>1.15.0-SNAPSHOT</version>
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
index 4454386..959c31b 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.dbcp.hive;
 
-import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.jdbc.HiveDriver;
@@ -26,6 +26,7 @@
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.resource.ResourceCardinality;
@@ -33,6 +34,7 @@
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.dbcp.DBCPValidator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.hadoop.SecurityUtil;
@@ -72,6 +74,8 @@
 public class HiveConnectionPool extends AbstractControllerService implements HiveDBCPService {
     private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
 
+    private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
+
     public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
             .name("hive-db-connect-url")
             .displayName("Database Connection URL")
@@ -137,6 +141,18 @@
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
+            .displayName("Max Connection Lifetime")
+            .name("hive-max-conn-lifetime")
+            .description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " +
+                    "connection pool will invalidate the connection. A value of zero or -1 " +
+                    "means the connection has an infinite lifetime.")
+            .defaultValue(DEFAULT_MAX_CONN_LIFETIME)
+            .required(true)
+            .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
     public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
             .name("Validation-query")
             .displayName("Validation query")
@@ -181,6 +197,7 @@
         props.add(DB_PASSWORD);
         props.add(MAX_WAIT_TIME);
         props.add(MAX_TOTAL_CONNECTIONS);
+        props.add(MAX_CONN_LIFETIME);
         props.add(VALIDATION_QUERY);
         props.add(KERBEROS_CREDENTIALS_SERVICE);
 
@@ -335,14 +352,16 @@
         final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
         final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
         final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
+        final long maxConnectionLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
 
         dataSource = new BasicDataSource();
         dataSource.setDriverClassName(drv);
 
         connectionUrl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
 
-        dataSource.setMaxWait(maxWaitMillis);
-        dataSource.setMaxActive(maxTotal);
+        dataSource.setMaxWaitMillis(maxWaitMillis);
+        dataSource.setMaxTotal(maxTotal);
+        dataSource.setMaxConnLifetimeMillis(maxConnectionLifetimeMillis);
 
         if (validationQuery != null && !validationQuery.isEmpty()) {
             dataSource.setValidationQuery(validationQuery);
@@ -426,4 +445,13 @@
     boolean isAllowExplicitKeytab() {
         return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
     }
+
+    private long extractMillisWithInfinite(PropertyValue prop) {
+        if (prop.getValue() == null || DEFAULT_MAX_CONN_LIFETIME.equals(prop.getValue())) {
+            return -1;
+        } else {
+            return prop.asTimePeriod(TimeUnit.MILLISECONDS);
+        }
+    }
+
 }
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java
index b109740..be8c2c5 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java
@@ -31,7 +31,7 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
@@ -131,6 +131,7 @@
         final String USER = "user";
         final String PASS = "pass";
         final int MAX_CONN = 7;
+        final String MAX_CONN_LIFETIME = "1 sec";
         final String MAX_WAIT = "10 sec"; // 10000 milliseconds
         final String CONF = "/path/to/hive-site.xml";
         hiveConnectionPool = new HiveConnectionPool();
@@ -140,6 +141,7 @@
             put(HiveConnectionPool.DB_USER, "${username}");
             put(HiveConnectionPool.DB_PASSWORD, "${password}");
             put(HiveConnectionPool.MAX_TOTAL_CONNECTIONS, "${maxconn}");
+            put(HiveConnectionPool.MAX_CONN_LIFETIME, "${maxconnlifetime}");
             put(HiveConnectionPool.MAX_WAIT_TIME, "${maxwait}");
             put(HiveConnectionPool.HIVE_CONFIGURATION_RESOURCES, "${hiveconf}");
         }};
@@ -149,6 +151,7 @@
         registry.setVariable(new VariableDescriptor("username"), USER);
         registry.setVariable(new VariableDescriptor("password"), PASS);
         registry.setVariable(new VariableDescriptor("maxconn"), Integer.toString(MAX_CONN));
+        registry.setVariable(new VariableDescriptor("maxconnlifetime"), MAX_CONN_LIFETIME);
         registry.setVariable(new VariableDescriptor("maxwait"), MAX_WAIT);
         registry.setVariable(new VariableDescriptor("hiveconf"), CONF);
 
@@ -162,8 +165,9 @@
         assertEquals(URL, basicDataSource.getUrl());
         assertEquals(USER, basicDataSource.getUsername());
         assertEquals(PASS, basicDataSource.getPassword());
-        assertEquals(MAX_CONN, basicDataSource.getMaxActive());
-        assertEquals(10000L, basicDataSource.getMaxWait());
+        assertEquals(MAX_CONN, basicDataSource.getMaxTotal());
+        assertEquals(1000L, basicDataSource.getMaxConnLifetimeMillis());
+        assertEquals(10000L, basicDataSource.getMaxWaitMillis());
         assertEquals(URL, hiveConnectionPool.getConnectionURL());
     }
 
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/pom.xml
index 3fdaed2..d00eb17 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/pom.xml
@@ -132,6 +132,11 @@
             <version>1.8</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-dbcp2</artifactId>
+            <version>2.7.0</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
             <version>1.15.0-SNAPSHOT</version>
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java
index 2b47335..ebd0942 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.dbcp.hive;
 
-import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.jdbc.HiveDriver;
@@ -26,6 +26,7 @@
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.resource.ResourceCardinality;
@@ -33,6 +34,7 @@
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.dbcp.DBCPValidator;
 import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.hadoop.SecurityUtil;
@@ -69,6 +71,9 @@
 @Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
 @CapabilityDescription("Provides Database Connection Pooling Service for Apache Hive 1.1.x. Connections can be asked from pool and returned after usage.")
 public class Hive_1_1ConnectionPool extends AbstractControllerService implements Hive_1_1DBCPService {
+
+    private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
+
     public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
             .name("hive-db-connect-url")
             .displayName("Database Connection URL")
@@ -134,6 +139,18 @@
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
+            .displayName("Max Connection Lifetime")
+            .name("hive-max-conn-lifetime")
+            .description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " +
+                    "connection pool will invalidate the connection. A value of zero or -1 " +
+                    "means the connection has an infinite lifetime.")
+            .defaultValue(DEFAULT_MAX_CONN_LIFETIME)
+            .required(true)
+            .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
     public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
             .name("Validation-query")
             .displayName("Validation query")
@@ -195,6 +212,7 @@
         props.add(DB_PASSWORD);
         props.add(MAX_WAIT_TIME);
         props.add(MAX_TOTAL_CONNECTIONS);
+        props.add(MAX_CONN_LIFETIME);
         props.add(VALIDATION_QUERY);
         props.add(KERBEROS_CREDENTIALS_SERVICE);
         props.add(KERBEROS_PRINCIPAL);
@@ -334,14 +352,16 @@
         final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
         final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
         final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
+        final long maxConnectionLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
 
         dataSource = new BasicDataSource();
         dataSource.setDriverClassName(drv);
 
         connectionUrl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
 
-        dataSource.setMaxWait(maxWaitMillis);
-        dataSource.setMaxActive(maxTotal);
+        dataSource.setMaxWaitMillis(maxWaitMillis);
+        dataSource.setMaxTotal(maxTotal);
+        dataSource.setMaxConnLifetimeMillis(maxConnectionLifetimeMillis);
 
         if (validationQuery != null && !validationQuery.isEmpty()) {
             dataSource.setValidationQuery(validationQuery);
@@ -421,4 +441,11 @@
         return connectionUrl;
     }
 
+    private long extractMillisWithInfinite(PropertyValue prop) {
+        if (prop.getValue() == null || DEFAULT_MAX_CONN_LIFETIME.equals(prop.getValue())) {
+            return -1;
+        } else {
+            return prop.asTimePeriod(TimeUnit.MILLISECONDS);
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPoolTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPoolTest.java
index d129084..5881dba 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPoolTest.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPoolTest.java
@@ -32,7 +32,7 @@
 import java.util.Map;
 import java.util.UUID;
 
-import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
@@ -113,6 +113,7 @@
         final String USER = "user";
         final String PASS = "pass";
         final int MAX_CONN = 7;
+        final String MAX_CONN_LIFETIME = "1 sec";
         final String MAX_WAIT = "10 sec"; // 10000 milliseconds
         final String CONF = "/path/to/hive-site.xml";
         hiveConnectionPool = new Hive_1_1ConnectionPool();
@@ -122,6 +123,7 @@
             put(Hive_1_1ConnectionPool.DB_USER, "${username}");
             put(Hive_1_1ConnectionPool.DB_PASSWORD, "${password}");
             put(Hive_1_1ConnectionPool.MAX_TOTAL_CONNECTIONS, "${maxconn}");
+            put(Hive_1_1ConnectionPool.MAX_CONN_LIFETIME, "${maxconnlifetime}");
             put(Hive_1_1ConnectionPool.MAX_WAIT_TIME, "${maxwait}");
             put(Hive_1_1ConnectionPool.HIVE_CONFIGURATION_RESOURCES, "${hiveconf}");
         }};
@@ -131,6 +133,7 @@
         registry.setVariable(new VariableDescriptor("username"), USER);
         registry.setVariable(new VariableDescriptor("password"), PASS);
         registry.setVariable(new VariableDescriptor("maxconn"), Integer.toString(MAX_CONN));
+        registry.setVariable(new VariableDescriptor("maxconnlifetime"), MAX_CONN_LIFETIME);
         registry.setVariable(new VariableDescriptor("maxwait"), MAX_WAIT);
         registry.setVariable(new VariableDescriptor("hiveconf"), CONF);
 
@@ -144,8 +147,9 @@
         assertEquals(URL, basicDataSource.getUrl());
         assertEquals(USER, basicDataSource.getUsername());
         assertEquals(PASS, basicDataSource.getPassword());
-        assertEquals(MAX_CONN, basicDataSource.getMaxActive());
-        assertEquals(10000L, basicDataSource.getMaxWait());
+        assertEquals(MAX_CONN, basicDataSource.getMaxTotal());
+        assertEquals(1000L, basicDataSource.getMaxConnLifetimeMillis());
+        assertEquals(10000L, basicDataSource.getMaxWaitMillis());
         assertEquals(URL, hiveConnectionPool.getConnectionURL());
     }