DRILL-7467: Jdbc plugin enhancements and fixes

1. Added logic to close data source when plugin is closed.
2. Added disabled jdbc plugin template to the bootstrap storage plugins.
3. Added new jdbc storage plugin configuration property sourceParameters which would allow setting data source parameters described in BasicDataSource Configuration Parameters.
4. Upgraded commons-dbcp2 version and added it to the dependency management section in common pom.xml.

closes #1956
diff --git a/common/src/main/java/org/apache/drill/common/AutoCloseables.java b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
index 7a5fb69..95c0b6f 100644
--- a/common/src/main/java/org/apache/drill/common/AutoCloseables.java
+++ b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
@@ -30,7 +30,7 @@
  */
 public class AutoCloseables {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(AutoCloseables.class);
+  private static final Logger logger = LoggerFactory.getLogger(AutoCloseables.class);
 
   public interface Closeable extends AutoCloseable {
     @Override
@@ -38,12 +38,7 @@
   }
 
   public static AutoCloseable all(final Collection<? extends AutoCloseable> autoCloseables) {
-    return new AutoCloseable() {
-      @Override
-      public void close() throws Exception {
-        AutoCloseables.close(autoCloseables);
-      }
-    };
+    return () -> close(autoCloseables);
   }
 
   /**
@@ -80,7 +75,7 @@
     try {
       close(Arrays.asList(autoCloseables));
     } catch (Exception e) {
-      throw UserException.dataReadError(e).build(LOGGER);
+      throw UserException.dataReadError(e).build(logger);
     }
   }
 
@@ -88,9 +83,9 @@
    * Closes all autoCloseables if not null and suppresses subsequent exceptions if more than one
    * @param autoCloseables the closeables to close
    */
-  public static void close(Iterable<? extends AutoCloseable> ac) throws Exception {
+  public static void close(Iterable<? extends AutoCloseable> autoCloseables) throws Exception {
     Exception topLevelException = null;
-    for (AutoCloseable closeable : ac) {
+    for (AutoCloseable closeable : autoCloseables) {
       try {
         if (closeable != null) {
           closeable.close();
@@ -110,7 +105,7 @@
 
   /**
    * Close all without caring about thrown exceptions
-   * @param closeables - array containing auto closeables
+   * @param closeables array containing auto closeables
    */
   public static void closeSilently(AutoCloseable... closeables) {
     Arrays.stream(closeables).filter(Objects::nonNull)
@@ -118,9 +113,8 @@
           try {
             target.close();
           } catch (Exception e) {
-            LOGGER.warn(String.format("Exception was thrown while closing auto closeable: %s", target), e);
+            logger.warn("Exception was thrown while closing auto closeable: {}", target, e);
           }
         });
   }
-
 }
diff --git a/contrib/storage-jdbc/pom.xml b/contrib/storage-jdbc/pom.xml
index f702a2e..299d9db 100755
--- a/contrib/storage-jdbc/pom.xml
+++ b/contrib/storage-jdbc/pom.xml
@@ -40,6 +40,10 @@
       <artifactId>drill-java-exec</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-dbcp2</artifactId>
+    </dependency>
 
     <!-- Test dependencies -->
     <dependency>
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcRuleBase.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcRuleBase.java
index ac95867..c0b28bf 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcRuleBase.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcRuleBase.java
@@ -58,7 +58,7 @@
 
   static class DrillJdbcProjectRule extends DrillJdbcRuleBase {
 
-    public DrillJdbcProjectRule(JdbcConvention out) {
+    DrillJdbcProjectRule(JdbcConvention out) {
       super(LogicalProject.class, Convention.NONE, out, "DrillJdbcProjectRule");
     }
 
@@ -89,7 +89,7 @@
 
   static class DrillJdbcFilterRule extends DrillJdbcRuleBase {
 
-    public DrillJdbcFilterRule(JdbcConvention out) {
+    DrillJdbcFilterRule(JdbcConvention out) {
       super(LogicalFilter.class, Convention.NONE, out, "DrillJdbcFilterRule");
     }
 
@@ -113,9 +113,8 @@
         return true;
 
       } catch (ExecutionException e) {
-        throw new IllegalStateException("Failure while trying to evaluate pushdown.", e);
+        throw new IllegalStateException("Failure while trying to evaluate push down.", e);
       }
     }
   }
-
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
index 9073f4d..f5db0b7 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
@@ -35,7 +35,7 @@
       List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     JdbcStoragePlugin plugin = config.getPlugin();
-    RecordReader reader = new JdbcRecordReader(plugin.getSource(),
+    RecordReader reader = new JdbcRecordReader(plugin.getDataSource(),
         config.getSql(), plugin.getName(), config.getColumns());
     return new ScanBatch(config, context, Collections.singletonList(reader));
   }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDrel.java
index 5054868..14e29d6 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDrel.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDrel.java
@@ -47,5 +47,4 @@
   public LogicalOperator implement(DrillImplementor implementor) {
     throw new UnsupportedOperationException();
   }
-
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcExpressionCheck.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcExpressionCheck.java
index 4637abb..8f66610 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcExpressionCheck.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcExpressionCheck.java
@@ -43,7 +43,7 @@
 
   private static final JdbcExpressionCheck INSTANCE = new JdbcExpressionCheck();
 
-  public static boolean isOnlyStandardExpressions(RexNode rex) {
+  static boolean isOnlyStandardExpressions(RexNode rex) {
     return rex.accept(INSTANCE);
   }
 
@@ -64,9 +64,9 @@
 
   @Override
   public Boolean visitCall(RexCall paramRexCall) {
-    if(paramRexCall.getOperator() instanceof DrillSqlOperator){
+    if (paramRexCall.getOperator() instanceof DrillSqlOperator) {
       return false;
-    }else{
+    } else {
       for (RexNode operand : paramRexCall.operands) {
         if (!operand.accept(this)) {
           return false;
@@ -81,9 +81,9 @@
       return false;
     }
 
-    final RexWindow window = over.getWindow();
+    RexWindow window = over.getWindow();
     for (RexFieldCollation orderKey : window.orderKeys) {
-      if (!((RexNode) orderKey.left).accept(this)) {
+      if (!orderKey.left.accept(this)) {
         return false;
       }
     }
@@ -132,5 +132,4 @@
   public Boolean visitPatternFieldRef(RexPatternFieldRef fieldRef) {
     return false;
   }
-
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
index 5774f6c..baaa0b5 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.jdbc;
 
-import java.io.IOException;
 import java.util.List;
 
 import org.apache.calcite.plan.RelOptCluster;
@@ -42,7 +41,7 @@
   }
 
   @Override
-  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) {
     throw new UnsupportedOperationException();
   }
 
@@ -67,7 +66,7 @@
   }
 
   @Override
-  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) {
     throw new UnsupportedOperationException("This needs to be finalized before using a PrelVisitor.");
   }
 
@@ -75,5 +74,4 @@
   public boolean needsFinalColumnReordering() {
     return false;
   }
-
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
index 72120e6..d30feb0 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
@@ -69,7 +69,7 @@
   //Substitute newline. Also stripping away single line comments. Expecting hints to be nested in '/* <hint> */'
   private String stripToOneLineSql(String sql) {
     StringBuilder strippedSqlTextBldr = new StringBuilder(sql.length());
-    String sqlToken[] = sql.split("\\n");
+    String[] sqlToken = sql.split("\\n");
     for (String sqlTextLine : sqlToken) {
       if (!sqlTextLine.trim().startsWith("--")) { //Skip comments
         strippedSqlTextBldr.append(sqlTextLine).append(' ');
@@ -78,7 +78,7 @@
     return strippedSqlTextBldr.toString();
   }
 
-  private class SubsetRemover extends RelShuttleImpl {
+  private static class SubsetRemover extends RelShuttleImpl {
 
     @Override
     public RelNode visit(RelNode other) {
@@ -135,5 +135,4 @@
   public boolean needsFinalColumnReordering() {
     return false;
   }
-
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
index 370ae1f..7966ca0 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
@@ -61,11 +61,12 @@
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-@SuppressWarnings("unchecked")
 class JdbcRecordReader extends AbstractRecordReader {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
-      .getLogger(JdbcRecordReader.class);
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordReader.class);
 
   private static final ImmutableMap<Integer, MinorType> JDBC_TYPE_MAPPINGS;
   private final DataSource source;
@@ -86,7 +87,7 @@
   }
 
   static {
-    JDBC_TYPE_MAPPINGS = (ImmutableMap<Integer, MinorType>) (Object) ImmutableMap.builder()
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<Integer, MinorType>builder()
         .put(java.sql.Types.DOUBLE, MinorType.FLOAT8)
         .put(java.sql.Types.FLOAT, MinorType.FLOAT4)
         .put(java.sql.Types.TINYINT, MinorType.INT)
@@ -129,10 +130,10 @@
             f.getType() == int.class &&
             f.getInt(null) == javaSqlType) {
           return f.getName();
-
         }
       }
     } catch (IllegalArgumentException | IllegalAccessException e) {
+      logger.trace("Unable to SQL type {} into String: {}", javaSqlType, e.getMessage());
     }
 
     return Integer.toString(javaSqlType);
@@ -140,7 +141,6 @@
   }
 
   private Copier<?> getCopier(int jdbcType, int offset, ResultSet result, ValueVector v) {
-
     switch (jdbcType) {
       case java.sql.Types.BIGINT:
         return new BigIntCopier(offset, result, (NullableBigIntVector.Mutator) v.getMutator());
@@ -199,8 +199,8 @@
                   "Expected columns: %s\n" +
                   "Returned columns count: %s",
               columns, columnsCount)
-            .addContext("sql", sql)
-            .addContext("plugin", storagePluginName)
+            .addContext("Sql", sql)
+            .addContext("Plugin", storagePluginName)
             .build(logger);
       }
       ImmutableList.Builder<ValueVector> vectorBuilder = ImmutableList.builder();
@@ -214,7 +214,6 @@
         int scale = meta.getScale(i);
         MinorType minorType = JDBC_TYPE_MAPPINGS.get(jdbcType);
         if (minorType == null) {
-
           logger.warn("Ignoring column that is unsupported.", UserException
               .unsupportedError()
               .message(
@@ -222,11 +221,10 @@
                       + "The column's name was %s and its JDBC data type was %s. ",
                   name,
                   nameFromType(jdbcType))
-              .addContext("sql", sql)
-              .addContext("column Name", name)
-              .addContext("plugin", storagePluginName)
+              .addContext("Sql", sql)
+              .addContext("Column Name", name)
+              .addContext("Plugin", storagePluginName)
               .build(logger));
-
           continue;
         }
 
@@ -242,7 +240,6 @@
         ValueVector vector = output.addField(field, clazz);
         vectorBuilder.add(vector);
         copierBuilder.add(getCopier(jdbcType, i, resultSet, vector));
-
       }
 
       vectors = vectorBuilder.build();
@@ -251,13 +248,12 @@
     } catch (SQLException | SchemaChangeException e) {
       throw UserException.dataReadError(e)
           .message("The JDBC storage plugin failed while trying setup the SQL query. ")
-          .addContext("sql", sql)
-          .addContext("plugin", storagePluginName)
+          .addContext("Sql", sql)
+          .addContext("Plugin", storagePluginName)
           .build(logger);
     }
   }
 
-
   @Override
   public int next() {
     int counter = 0;
@@ -276,21 +272,22 @@
       throw UserException
           .dataReadError(e)
           .message("Failure while attempting to read from database.")
-          .addContext("sql", sql)
-          .addContext("plugin", storagePluginName)
+          .addContext("Sql", sql)
+          .addContext("Plugin", storagePluginName)
           .build(logger);
     }
 
+    int valueCount = Math.max(counter, 0);
     for (ValueVector vv : vectors) {
-      vv.getMutator().setValueCount(counter > 0 ? counter : 0);
+      vv.getMutator().setValueCount(valueCount);
     }
 
-    return counter>0 ? counter : 0;
+    return valueCount;
   }
 
   @Override
-  public void close() throws Exception {
-    AutoCloseables.close(resultSet, statement, connection);
+  public void close() {
+    AutoCloseables.closeSilently(resultSet, statement, connection);
   }
 
   @Override
@@ -300,13 +297,12 @@
         + "]";
   }
 
-  private abstract class Copier<T extends ValueVector.Mutator> {
-    protected final int columnIndex;
-    protected final ResultSet result;
-    protected final T mutator;
+  private abstract static class Copier<T extends ValueVector.Mutator> {
+    final int columnIndex;
+    final ResultSet result;
+    final T mutator;
 
-    public Copier(int columnIndex, ResultSet result, T mutator) {
-      super();
+    Copier(int columnIndex, ResultSet result, T mutator) {
       this.columnIndex = columnIndex;
       this.result = result;
       this.mutator = mutator;
@@ -315,8 +311,9 @@
     abstract void copy(int index) throws SQLException;
   }
 
-  private class IntCopier extends Copier<NullableIntVector.Mutator> {
-    public IntCopier(int offset, ResultSet set, NullableIntVector.Mutator mutator) {
+  private static class IntCopier extends Copier<NullableIntVector.Mutator> {
+
+    IntCopier(int offset, ResultSet set, NullableIntVector.Mutator mutator) {
       super(offset, set, mutator);
     }
 
@@ -329,8 +326,9 @@
     }
   }
 
-  private class BigIntCopier extends Copier<NullableBigIntVector.Mutator> {
-    public BigIntCopier(int offset, ResultSet set, NullableBigIntVector.Mutator mutator) {
+  private static class BigIntCopier extends Copier<NullableBigIntVector.Mutator> {
+
+    BigIntCopier(int offset, ResultSet set, NullableBigIntVector.Mutator mutator) {
       super(offset, set, mutator);
     }
 
@@ -341,12 +339,11 @@
         mutator.setNull(index);
       }
     }
-
   }
 
-  private class Float4Copier extends Copier<NullableFloat4Vector.Mutator> {
+  private static class Float4Copier extends Copier<NullableFloat4Vector.Mutator> {
 
-    public Float4Copier(int columnIndex, ResultSet result, NullableFloat4Vector.Mutator mutator) {
+    Float4Copier(int columnIndex, ResultSet result, NullableFloat4Vector.Mutator mutator) {
       super(columnIndex, result, mutator);
     }
 
@@ -357,13 +354,11 @@
         mutator.setNull(index);
       }
     }
-
   }
 
+  private static class Float8Copier extends Copier<NullableFloat8Vector.Mutator> {
 
-  private class Float8Copier extends Copier<NullableFloat8Vector.Mutator> {
-
-    public Float8Copier(int columnIndex, ResultSet result, NullableFloat8Vector.Mutator mutator) {
+    Float8Copier(int columnIndex, ResultSet result, NullableFloat8Vector.Mutator mutator) {
       super(columnIndex, result, mutator);
     }
 
@@ -373,14 +368,12 @@
       if (result.wasNull()) {
         mutator.setNull(index);
       }
-
     }
-
   }
 
-  private class DecimalCopier extends Copier<NullableVarDecimalVector.Mutator> {
+  private static class DecimalCopier extends Copier<NullableVarDecimalVector.Mutator> {
 
-    public DecimalCopier(int columnIndex, ResultSet result, NullableVarDecimalVector.Mutator mutator) {
+    DecimalCopier(int columnIndex, ResultSet result, NullableVarDecimalVector.Mutator mutator) {
       super(columnIndex, result, mutator);
     }
 
@@ -391,29 +384,27 @@
         mutator.setSafe(index, decimal);
       }
     }
-
   }
 
-  private class VarCharCopier extends Copier<NullableVarCharVector.Mutator> {
+  private static class VarCharCopier extends Copier<NullableVarCharVector.Mutator> {
 
-    public VarCharCopier(int columnIndex, ResultSet result, NullableVarCharVector.Mutator mutator) {
+    VarCharCopier(int columnIndex, ResultSet result, NullableVarCharVector.Mutator mutator) {
       super(columnIndex, result, mutator);
     }
 
     @Override
     void copy(int index) throws SQLException {
-      String val = resultSet.getString(columnIndex);
+      String val = result.getString(columnIndex);
       if (val != null) {
         byte[] record = val.getBytes(Charsets.UTF_8);
         mutator.setSafe(index, record, 0, record.length);
       }
     }
-
   }
 
-  private class VarBinaryCopier extends Copier<NullableVarBinaryVector.Mutator> {
+  private static class VarBinaryCopier extends Copier<NullableVarBinaryVector.Mutator> {
 
-    public VarBinaryCopier(int columnIndex, ResultSet result, NullableVarBinaryVector.Mutator mutator) {
+    VarBinaryCopier(int columnIndex, ResultSet result, NullableVarBinaryVector.Mutator mutator) {
       super(columnIndex, result, mutator);
     }
 
@@ -424,14 +415,13 @@
         mutator.setSafe(index, record, 0, record.length);
       }
     }
-
   }
 
-  private class DateCopier extends Copier<NullableDateVector.Mutator> {
+  private static class DateCopier extends Copier<NullableDateVector.Mutator> {
 
     private final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
 
-    public DateCopier(int columnIndex, ResultSet result, NullableDateVector.Mutator mutator) {
+    DateCopier(int columnIndex, ResultSet result, NullableDateVector.Mutator mutator) {
       super(columnIndex, result, mutator);
     }
 
@@ -442,14 +432,13 @@
         mutator.setSafe(index, date.getTime());
       }
     }
-
   }
 
-  private class TimeCopier extends Copier<NullableTimeVector.Mutator> {
+  private static class TimeCopier extends Copier<NullableTimeVector.Mutator> {
 
     private final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
 
-    public TimeCopier(int columnIndex, ResultSet result, NullableTimeVector.Mutator mutator) {
+    TimeCopier(int columnIndex, ResultSet result, NullableTimeVector.Mutator mutator) {
       super(columnIndex, result, mutator);
     }
 
@@ -459,17 +448,14 @@
       if (time != null) {
         mutator.setSafe(index, (int) time.getTime());
       }
-
     }
-
   }
 
-
-  private class TimeStampCopier extends Copier<NullableTimeStampVector.Mutator> {
+  private static class TimeStampCopier extends Copier<NullableTimeStampVector.Mutator> {
 
     private final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
 
-    public TimeStampCopier(int columnIndex, ResultSet result, NullableTimeStampVector.Mutator mutator) {
+    TimeStampCopier(int columnIndex, ResultSet result, NullableTimeStampVector.Mutator mutator) {
       super(columnIndex, result, mutator);
     }
 
@@ -479,14 +465,12 @@
       if (stamp != null) {
         mutator.setSafe(index, stamp.getTime());
       }
-
     }
-
   }
 
-  private class BitCopier extends Copier<NullableBitVector.Mutator> {
+  private static class BitCopier extends Copier<NullableBitVector.Mutator> {
 
-    public BitCopier(int columnIndex, ResultSet result, NullableBitVector.Mutator mutator) {
+    BitCopier(int columnIndex, ResultSet result, NullableBitVector.Mutator mutator) {
       super(columnIndex, result, mutator);
     }
 
@@ -497,6 +481,5 @@
         mutator.setNull(index);
       }
     }
-
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
index 571490c..9124c40 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.store.jdbc;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Objects;
 
 import com.fasterxml.jackson.annotation.JsonFilter;
@@ -37,6 +39,7 @@
   private final String username;
   private final String password;
   private final boolean caseInsensitiveTableNames;
+  private final Map<String, Object> sourceParameters;
 
   @JsonCreator
   public JdbcStorageConfig(
@@ -44,13 +47,14 @@
       @JsonProperty("url") String url,
       @JsonProperty("username") String username,
       @JsonProperty("password") String password,
-      @JsonProperty("caseInsensitiveTableNames") boolean caseInsensitiveTableNames) {
-    super();
+      @JsonProperty("caseInsensitiveTableNames") boolean caseInsensitiveTableNames,
+      @JsonProperty("sourceParameters") Map<String, Object> sourceParameters) {
     this.driver = driver;
     this.url = url;
     this.username = username;
     this.password = password;
     this.caseInsensitiveTableNames = caseInsensitiveTableNames;
+    this.sourceParameters = sourceParameters == null ? Collections.emptyMap() : sourceParameters;
   }
 
   public String getDriver() {
@@ -74,30 +78,29 @@
     return caseInsensitiveTableNames;
   }
 
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((driver == null) ? 0 : driver.hashCode());
-    result = prime * result + ((password == null) ? 0 : password.hashCode());
-    result = prime * result + ((url == null) ? 0 : url.hashCode());
-    result = prime * result + ((username == null) ? 0 : username.hashCode());
-    result = prime * result + (caseInsensitiveTableNames ? 1231 : 1237);
-    return result;
+  public Map<String, Object> getSourceParameters() {
+    return sourceParameters;
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
+  public int hashCode() {
+    return Objects.hash(driver, url, username, password, caseInsensitiveTableNames, sourceParameters);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
       return true;
-    } else if (obj == null || getClass() != obj.getClass()) {
+    }
+    if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    JdbcStorageConfig conf = (JdbcStorageConfig) obj;
-    return caseInsensitiveTableNames == conf.caseInsensitiveTableNames
-        && Objects.equals(driver, conf.driver)
-        && Objects.equals(password, conf.password)
-        && Objects.equals(url, conf.url)
-        && Objects.equals(username, conf.username);
+    JdbcStorageConfig that = (JdbcStorageConfig) o;
+    return caseInsensitiveTableNames == that.caseInsensitiveTableNames &&
+        Objects.equals(driver, that.driver) &&
+        Objects.equals(url, that.url) &&
+        Objects.equals(username, that.username) &&
+        Objects.equals(password, that.password) &&
+        Objects.equals(sourceParameters, that.sourceParameters);
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
index 74a0507..a00068e 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
@@ -17,6 +17,10 @@
  */
 package org.apache.drill.exec.store.jdbc;
 
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.util.Map;
 import javax.sql.DataSource;
 import java.util.Set;
 
@@ -26,42 +30,37 @@
 import org.apache.calcite.sql.SqlDialect;
 import org.apache.calcite.sql.SqlDialectFactoryImpl;
 import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class JdbcStoragePlugin extends AbstractStoragePlugin {
 
+  private static final Logger logger = LoggerFactory.getLogger(JdbcStoragePlugin.class);
+
   private final JdbcStorageConfig config;
-  private final DataSource source;
+  private final BasicDataSource dataSource;
   private final SqlDialect dialect;
   private final DrillJdbcConvention convention;
 
   public JdbcStoragePlugin(JdbcStorageConfig config, DrillbitContext context, String name) {
     super(context, name);
     this.config = config;
-    BasicDataSource source = new BasicDataSource();
-    source.setDriverClassName(config.getDriver());
-    source.setUrl(config.getUrl());
-
-    if (config.getUsername() != null) {
-      source.setUsername(config.getUsername());
-    }
-
-    if (config.getPassword() != null) {
-      source.setPassword(config.getPassword());
-    }
-
-    this.source = source;
-    this.dialect = JdbcSchema.createDialect(SqlDialectFactoryImpl.INSTANCE, source);
+    this.dataSource = initDataSource(config);
+    this.dialect = JdbcSchema.createDialect(SqlDialectFactoryImpl.INSTANCE, dataSource);
     this.convention = new DrillJdbcConvention(dialect, name, this);
   }
 
-
   @Override
   public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
-    JdbcCatalogSchema schema = new JdbcCatalogSchema(getName(), source, dialect, convention,
+    JdbcCatalogSchema schema = new JdbcCatalogSchema(getName(), dataSource, dialect, convention,
         !this.config.areTableNamesCaseInsensitive());
     SchemaPlus holder = parent.add(getName(), schema);
     schema.setHolder(holder);
@@ -77,8 +76,8 @@
     return true;
   }
 
-  public DataSource getSource() {
-    return source;
+  public DataSource getDataSource() {
+    return dataSource;
   }
 
   public SqlDialect getDialect() {
@@ -89,4 +88,51 @@
   public Set<RelOptRule> getPhysicalOptimizerRules(OptimizerRulesContext context) {
     return convention.getRules();
   }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(dataSource);
+  }
+
+  /**
+   * Initializes {@link BasicDataSource} instance and configures it based on given
+   * storage plugin configuration.
+   * Basic parameters such as driver, url, user name and password are set using setters.
+   * Other source parameters are set dynamically by invoking setter based on given parameter name.
+   * If given parameter is absent, it will be ignored. If value is incorrect
+   * (for example, String is passed instead of int), data source initialization will fail.
+   * Parameter names should correspond to names available in documentation:
+   * <a href="https://commons.apache.org/proper/commons-dbcp/configuration.html">.
+   *
+   * @param config storage plugin config
+   * @return basic data source instance
+   * @throws UserException if unable to set source parameter
+   */
+  @VisibleForTesting
+  static BasicDataSource initDataSource(JdbcStorageConfig config) {
+    BasicDataSource dataSource = new BasicDataSource();
+    dataSource.setDriverClassName(config.getDriver());
+    dataSource.setUrl(config.getUrl());
+    dataSource.setUsername(config.getUsername());
+    dataSource.setPassword(config.getPassword());
+
+    MethodHandles.Lookup publicLookup = MethodHandles.publicLookup();
+    for (Map.Entry<String, Object> entry : config.getSourceParameters().entrySet()) {
+      try {
+        Class<?> parameterType = dataSource.getClass().getDeclaredField(entry.getKey()).getType();
+        MethodType methodType = MethodType.methodType(void.class, parameterType);
+        MethodHandle methodHandle = publicLookup.findVirtual(dataSource.getClass(),
+          "set" + StringUtils.capitalize(entry.getKey()), methodType);
+        methodHandle.invokeWithArguments(dataSource, entry.getValue());
+      } catch (ReflectiveOperationException e) {
+        logger.warn("Unable to find / access setter for parameter {}: {}", entry.getKey(), e.getMessage());
+      } catch (Throwable e) {
+        throw UserException.connectionError()
+          .message("Unable to set value %s for parameter %s", entry.getKey(), entry.getValue())
+          .addContext("Error message:", e.getMessage())
+          .build(logger);
+      }
+    }
+    return dataSource;
+  }
 }
diff --git a/contrib/storage-jdbc/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-jdbc/src/main/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..35a5c00
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,16 @@
+{
+  "storage": {
+    "rdbms": {
+      "type": "jdbc",
+      "driver": "xxx.Driver",
+      "url": "jdbc:xxx:xxx",
+      "username": "xxx",
+      "password": "xxx",
+      "caseInsensitiveTableNames": false,
+      "sourceParameters" : {
+        "maxIdle" : 8
+      },
+      "enabled": false
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestBasicDataSource.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestBasicDataSource.java
new file mode 100644
index 0000000..f8b8b1e
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestBasicDataSource.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.test.BaseTest;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+
+public class TestBasicDataSource extends BaseTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testInitWithoutUserAndPassword() throws Exception {
+    JdbcStorageConfig config = new JdbcStorageConfig(
+      "driver", "url", null, null, false, null);
+    try (BasicDataSource dataSource = JdbcStoragePlugin.initDataSource(config)) {
+      assertEquals("driver", dataSource.getDriverClassName());
+      assertEquals("url", dataSource.getUrl());
+      assertNull(dataSource.getUsername());
+      assertNull(dataSource.getPassword());
+    }
+  }
+
+  @Test
+  public void testInitWithUserAndPassword() throws Exception {
+    JdbcStorageConfig config = new JdbcStorageConfig(
+      "driver", "url", "user", "password", false, null);
+    try (BasicDataSource dataSource = JdbcStoragePlugin.initDataSource(config)) {
+      assertEquals("user", dataSource.getUsername());
+      assertEquals("password", dataSource.getPassword());
+    }
+  }
+
+  @Test
+  public void testInitWithSourceParameters() throws Exception {
+    Map<String, Object> sourceParameters = new HashMap<>();
+    sourceParameters.put("maxIdle", 5);
+    sourceParameters.put("cacheState", false);
+    sourceParameters.put("validationQuery", "select * from information_schema.collations");
+    JdbcStorageConfig config = new JdbcStorageConfig(
+      "driver", "url", "user", "password", false, sourceParameters);
+    try (BasicDataSource dataSource = JdbcStoragePlugin.initDataSource(config)) {
+      assertEquals(5, dataSource.getMaxIdle());
+      assertFalse(dataSource.getCacheState());
+      assertEquals("select * from information_schema.collations", dataSource.getValidationQuery());
+    }
+  }
+
+  @Test
+  public void testInitWithIncorrectSourceParameterName() throws Exception {
+    Map<String, Object> sourceParameters = new HashMap<>();
+    sourceParameters.put("maxIdle", 5);
+    sourceParameters.put("abc", "abc");
+    sourceParameters.put("cacheState", false);
+    sourceParameters.put("validationQuery", null);
+    JdbcStorageConfig config = new JdbcStorageConfig(
+      "driver", "url", "user", "password", false, sourceParameters);
+    try (BasicDataSource dataSource = JdbcStoragePlugin.initDataSource(config)) {
+      // "abc" parameter will be ignored
+      assertEquals(5, dataSource.getMaxIdle());
+      assertFalse(dataSource.getCacheState());
+      assertNull(dataSource.getValidationQuery());
+    }
+  }
+
+  @Test
+  public void testInitWithIncorrectSourceParameterValue() {
+    Map<String, Object> sourceParameters = new HashMap<>();
+    sourceParameters.put("maxIdle", "abc");
+    JdbcStorageConfig config = new JdbcStorageConfig(
+      "driver", "url", "user", "password", false, sourceParameters);
+
+    thrown.expect(UserException.class);
+    thrown.expectMessage(UserBitShared.DrillPBError.ErrorType.CONNECTION.name());
+
+    JdbcStoragePlugin.initDataSource(config);
+  }
+}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
index d1bbc09..ed7a243 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.store.jdbc;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.drill.categories.JdbcStorageTest;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
@@ -36,11 +38,8 @@
 import java.sql.Connection;
 import java.sql.DriverManager;
 
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
 
 /**
  * JDBC storage plugin tests against H2.
@@ -63,7 +62,10 @@
          FileReader fileReader = new FileReader(scriptFile.getFile())) {
       RunScript.execute(connection, fileReader);
     }
-    JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("org.h2.Driver", connString, "root", "root", true);
+    Map<String, Object> sourceParameters =  new HashMap<>();
+    sourceParameters.put("maxIdle", 5);
+    sourceParameters.put("maxTotal", 5);
+    JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("org.h2.Driver", connString, "root", "root", true, sourceParameters);
     jdbcStorageConfig.setEnabled(true);
     cluster.defineStoragePlugin(ctx -> new JdbcStoragePlugin(jdbcStorageConfig, ctx, "h2"));
     cluster.defineStoragePlugin(ctx -> new JdbcStoragePlugin(jdbcStorageConfig, ctx, "h2o"));
@@ -112,18 +114,19 @@
   }
 
   @Test
-  public void pushdownJoin() throws Exception {
+  public void pushDownJoin() throws Exception {
     String query = "select x.person_id from (select person_id from h2.tmp.drill_h2_test.person) x "
             + "join (select person_id from h2.tmp.drill_h2_test.person) y on x.person_id = y.person_id ";
 
-    String plan = queryBuilder().sql(query).explainText();
-
-    assertThat("Query plan shouldn't contain Join operator",
-        plan, not(containsString("Join")));
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .exclude("Join")
+        .match();
   }
 
   @Test
-  public void pushdownJoinAndFilterPushDown() throws Exception {
+  public void pushDownJoinAndFilterPushDown() throws Exception {
     String query = "select * from \n" +
         "h2.tmp.drill_h2_test.person e\n" +
         "INNER JOIN \n" +
@@ -131,25 +134,27 @@
         "ON e.FIRST_NAME = s.FIRST_NAME\n" +
         "WHERE e.LAST_NAME > 'hello'";
 
-    String plan = queryBuilder().sql(query).explainText();
-
-    assertThat("Query plan shouldn't contain Join operator",
-        plan, not(containsString("Join")));
-    assertThat("Query plan shouldn't contain Filter operator",
-        plan, not(containsString("Filter")));
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .exclude("Join")
+        .exclude("Filter")
+        .match();
   }
 
   @Test
-  public void pushdownAggregation() throws Exception {
+  public void pushDownAggregation() throws Exception {
     String query = "select count(*) from h2.tmp.drill_h2_test.person";
-    String plan = queryBuilder().sql(query).explainText();
 
-    assertThat("Query plan shouldn't contain Aggregate operator",
-        plan, not(containsString("Aggregate")));
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .exclude("Aggregate")
+        .match();
   }
 
   @Test
-  public void pushdownDoubleJoinAndFilter() throws Exception {
+  public void pushDownDoubleJoinAndFilter() throws Exception {
     String query = "select * from \n" +
         "h2.tmp.drill_h2_test.person e\n" +
         "INNER JOIN \n" +
@@ -160,21 +165,22 @@
         "ON e.person_ID = ed.person_ID\n" +
         "WHERE s.first_name > 'abc' and ed.first_name > 'efg'";
 
-    String plan = queryBuilder().sql(query).explainText();
-
-    assertThat("Query plan shouldn't contain Join operator",
-        plan, not(containsString("Join")));
-    assertThat("Query plan shouldn't contain Filter operator",
-        plan, not(containsString("Filter")));
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .exclude("Join")
+        .exclude("Filter")
+        .match();
   }
 
   @Test // DRILL-7340
-  public void twoPluginsPredicatesPushdown() throws Exception {
+  public void twoPluginsPredicatesPushDown() throws Exception {
     String query = "SELECT * " +
         "FROM h2.tmp.drill_h2_test.person l " +
         "INNER JOIN h2o.tmp.drill_h2_test.person r " +
         "ON l.person_id = r.person_id " +
         "WHERE l.first_name = 'first_name_1' AND r.last_name = 'last_name_1'";
+
     queryBuilder()
         .sql(query)
         .planMatcher()
@@ -208,12 +214,14 @@
   }
 
   @Test
-  public void pushdownFilter() throws Exception {
+  public void pushDownFilter() throws Exception {
     String query = "select * from h2.tmp.drill_h2_test.person where person_ID = 1";
-    String plan = queryBuilder().sql(query).explainText();
 
-    assertThat("Query plan shouldn't contain Filter operator",
-        plan, not(containsString("Filter")));
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .exclude("Filter")
+        .match();
   }
 
   @Test
@@ -247,7 +255,6 @@
   public void showTablesForInformationSchema() throws Exception {
     run("USE h2.tmp.`INFORMATION_SCHEMA`");
     String sql = "SHOW TABLES";
-    queryBuilder().sql(sql).printCsv();
     testBuilder()
         .sqlQuery(sql)
         .unOrdered()
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
index 03ba794..28b993b 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
@@ -71,7 +71,7 @@
 
     JdbcStorageConfig jdbcStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver",
         String.format("jdbc:mysql://localhost:%s/%s?useJDBCCompliantTimezoneShift=true", mysqlPort, mysqlDBName),
-        "mysqlUser", "mysqlPass", false);
+        "mysqlUser", "mysqlPass", false, null);
     jdbcStorageConfig.setEnabled(true);
 
     cluster.defineStoragePlugin(ctx -> new JdbcStoragePlugin(jdbcStorageConfig, ctx, "mysql"));
@@ -80,7 +80,7 @@
       // adds storage plugin with case insensitive table names
       JdbcStorageConfig jdbcCaseSensitiveStorageConfig = new JdbcStorageConfig("com.mysql.cj.jdbc.Driver",
           String.format("jdbc:mysql://localhost:%s/%s?useJDBCCompliantTimezoneShift=true", mysqlPort, mysqlDBName),
-          "mysqlUser", "mysqlPass", true);
+          "mysqlUser", "mysqlPass", true, null);
       jdbcCaseSensitiveStorageConfig.setEnabled(true);
       cluster.defineStoragePlugin(ctx -> new JdbcStoragePlugin(jdbcCaseSensitiveStorageConfig, ctx, "mysqlCaseInsensitive"));
     }
@@ -171,7 +171,7 @@
   }
 
   @Test
-  public void pushdownJoin() throws Exception {
+  public void pushDownJoin() throws Exception {
     String query = "select x.person_id from (select person_id from mysql.`drill_mysql_test`.person) x "
             + "join (select person_id from mysql.`drill_mysql_test`.person) y on x.person_id = y.person_id";
     queryBuilder()
@@ -182,7 +182,7 @@
   }
 
   @Test
-  public void pushdownJoinAndFilterPushDown() throws Exception {
+  public void pushDownJoinAndFilterPushDown() throws Exception {
     String query = "select * from " +
             "mysql.`drill_mysql_test`.person e " +
             "INNER JOIN " +
@@ -205,10 +205,12 @@
   }
 
   @Test
-  public void emptyOutput() throws Exception {
+  public void emptyOutput() {
     String query = "select * from mysql.`drill_mysql_test`.person e limit 0";
 
-    run(query);
+    testBuilder()
+        .sqlQuery(query)
+        .expectsEmptyResultSet();
   }
 
   @Test
diff --git a/contrib/udfs/pom.xml b/contrib/udfs/pom.xml
index f67dab5..ed2bac2 100644
--- a/contrib/udfs/pom.xml
+++ b/contrib/udfs/pom.xml
@@ -21,7 +21,7 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
 
-<parent>
+  <parent>
     <artifactId>drill-contrib-parent</artifactId>
     <groupId>org.apache.drill.contrib</groupId>
     <version>1.18.0-SNAPSHOT</version>
@@ -129,4 +129,4 @@
     </plugins>
   </build>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
index 67c93e6..22c5ffd 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
@@ -21,7 +21,7 @@
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property="type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 public abstract class StoragePluginConfig {
 
   private Boolean enabled;
@@ -59,5 +59,4 @@
   public String getValue(String key) {
     return null;
   }
-
 }
diff --git a/pom.xml b/pom.xml
index a7d87ab..6ac0cb7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,7 +110,8 @@
     <joda.version>2.10.5</joda.version>
     <javax.el.version>3.0.0</javax.el.version>
     <surefire.version>3.0.0-M4</surefire.version>
-    <commons.compress>1.19</commons.compress>
+    <commons.compress.version>1.19</commons.compress.version>
+    <commons.dbcp2.version>2.7.0</commons.dbcp2.version>
   </properties>
 
   <scm>
@@ -1838,7 +1839,18 @@
       <dependency>
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-compress</artifactId>
-        <version>${commons.compress}</version>
+        <version>${commons.compress.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-dbcp2</artifactId>
+        <version>${commons.dbcp2.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+        </exclusions>
       </dependency>
     </dependencies>
   </dependencyManagement>