DRILL-7938: Convert JDBC Storage Plugin to EVF

* Initial Commit

* Most unit tests passing

* Null column issue fixed

* All unit tests passing

* Ignored h2 unit test due to improper timezone support

* Code cleanup

* Removed extra character

* Bump HikariCP to version 4.0.3

* Addressed review comments.  Precision and Scale remaining

* Removed unused imports

* Addressed precision and scale comment

* Updated Timezone for h2 tests

* Removed Whitespace
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
deleted file mode 100755
index f5db0b7..0000000
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.jdbc;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.ExecutorFragmentContext;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-
-public class JdbcBatchCreator implements BatchCreator<JdbcSubScan> {
-  @Override
-  public ScanBatch getBatch(ExecutorFragmentContext context, JdbcSubScan config,
-      List<RecordBatch> children) throws ExecutionSetupException {
-    Preconditions.checkArgument(children.isEmpty());
-    JdbcStoragePlugin plugin = config.getPlugin();
-    RecordReader reader = new JdbcRecordReader(plugin.getDataSource(),
-        config.getSql(), plugin.getName(), config.getColumns());
-    return new ScanBatch(config, context, Collections.singletonList(reader));
-  }
-}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchReader.java
new file mode 100644
index 0000000..8a86215
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchReader.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.jdbc.writers.JdbcBigintWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcBitWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcColumnWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcDateWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcDoubleWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcFloatWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcIntWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcTimeWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcTimestampWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcVarbinaryWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcVarcharWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcVardecimalWriter;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.drill.exec.planner.types.DrillRelDataTypeSystem.DRILL_REL_DATATYPE_SYSTEM;
+
+public class JdbcBatchReader implements ManagedReader<SchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(JdbcBatchReader.class);
+  private static final ImmutableMap<Integer, MinorType> JDBC_TYPE_MAPPINGS;
+  private final DataSource source;
+  private final String sql;
+  private final List<SchemaPath> columns;
+  private Connection connection;
+  private PreparedStatement statement;
+  private ResultSet resultSet;
+  private RowSetLoader rowWriter;
+  private CustomErrorContext errorContext;
+  private List<JdbcColumnWriter> columnWriters;
+  private List<JdbcColumn> jdbcColumns;
+
+
+  public JdbcBatchReader(DataSource source, String sql, List<SchemaPath> columns) {
+    this.source = source;
+    this.sql = sql;
+    this.columns = columns;
+  }
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.  All flavors of INT (SMALLINT, TINYINT etc)
+   * are mapped to INT in Drill, with the exception of BIGINT.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+   * mapped to VARBINARY.
+   *
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<Integer, MinorType>builder()
+      .put(java.sql.Types.DOUBLE, MinorType.FLOAT8)
+      .put(java.sql.Types.FLOAT, MinorType.FLOAT4)
+      .put(java.sql.Types.TINYINT, MinorType.INT)
+      .put(java.sql.Types.SMALLINT, MinorType.INT)
+      .put(java.sql.Types.INTEGER, MinorType.INT)
+      .put(java.sql.Types.BIGINT, MinorType.BIGINT)
+
+      .put(java.sql.Types.CHAR, MinorType.VARCHAR)
+      .put(java.sql.Types.VARCHAR, MinorType.VARCHAR)
+      .put(java.sql.Types.LONGVARCHAR, MinorType.VARCHAR)
+      .put(java.sql.Types.CLOB, MinorType.VARCHAR)
+
+      .put(java.sql.Types.NCHAR, MinorType.VARCHAR)
+      .put(java.sql.Types.NVARCHAR, MinorType.VARCHAR)
+      .put(java.sql.Types.LONGNVARCHAR, MinorType.VARCHAR)
+
+      .put(java.sql.Types.VARBINARY, MinorType.VARBINARY)
+      .put(java.sql.Types.LONGVARBINARY, MinorType.VARBINARY)
+      .put(java.sql.Types.BLOB, MinorType.VARBINARY)
+
+      .put(java.sql.Types.NUMERIC, MinorType.FLOAT8)
+      .put(java.sql.Types.DECIMAL, MinorType.VARDECIMAL)
+      .put(java.sql.Types.REAL, MinorType.FLOAT8)
+
+      .put(java.sql.Types.DATE, MinorType.DATE)
+      .put(java.sql.Types.TIME, MinorType.TIME)
+      .put(java.sql.Types.TIMESTAMP, MinorType.TIMESTAMP)
+
+      .put(java.sql.Types.BOOLEAN, MinorType.BIT)
+      .put(java.sql.Types.BIT, MinorType.BIT)
+
+      .build();
+  }
+
+  @Override
+  public boolean open(SchemaNegotiator negotiator) {
+
+    this.errorContext = negotiator.parentErrorContext();
+    try {
+      connection = source.getConnection();
+      statement = connection.prepareStatement(sql);
+      resultSet = statement.executeQuery();
+
+      TupleMetadata drillSchema = buildSchema();
+      negotiator.tableSchema(drillSchema, true);
+      ResultSetLoader resultSetLoader = negotiator.build();
+
+      // Create ScalarWriters
+      rowWriter = resultSetLoader.writer();
+      populateWriterArray();
+
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying setup the SQL query. ")
+        .addContext("Sql", sql)
+        .addContext(errorContext)
+        .build(logger);
+    }
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (!processRow()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean processRow() {
+    try {
+      if (!resultSet.next()) {
+        return false;
+      }
+      rowWriter.start();
+      // Process results
+      for (JdbcColumnWriter writer : columnWriters) {
+        writer.load(resultSet);
+      }
+      rowWriter.save();
+    } catch (SQLException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failure while attempting to read from database.")
+        .addContext("Sql", sql)
+        .addContext(errorContext)
+        .build(logger);
+    }
+
+    return true;
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(resultSet, statement, connection);
+  }
+
+  private TupleMetadata buildSchema() throws SQLException {
+    SchemaBuilder builder = new SchemaBuilder();
+    ResultSetMetaData meta = resultSet.getMetaData();
+    jdbcColumns = new ArrayList<>();
+
+    int columnsCount = meta.getColumnCount();
+
+    if (columns.size() != columnsCount) {
+      throw UserException
+        .validationError()
+        .message(
+          "Expected columns count differs from the returned one.\n" +
+            "Expected columns: %s\n" +
+            "Returned columns count: %s",
+          columns, columnsCount)
+        .addContext("Sql", sql)
+        .addContext(errorContext)
+        .build(logger);
+    }
+
+    for (int i = 1; i <= columnsCount; i++) {
+      String name = columns.get(i - 1).getRootSegmentPath();
+      // column index in ResultSetMetaData starts from 1
+      int jdbcType = meta.getColumnType(i);
+      int width = Math.min(meta.getPrecision(i), DRILL_REL_DATATYPE_SYSTEM.getMaxNumericPrecision());
+      int scale = Math.min(meta.getScale(i), DRILL_REL_DATATYPE_SYSTEM.getMaxNumericScale());
+
+      MinorType minorType = JDBC_TYPE_MAPPINGS.get(jdbcType);
+      if (minorType == null) {
+        logger.warn("Ignoring column that is unsupported.", UserException
+          .unsupportedError()
+          .message(
+            "A column you queried has a data type that is not currently supported by the JDBC storage plugin. "
+              + "The column's name was %s and its JDBC data type was %s. ",
+            name,
+            nameFromType(jdbcType))
+          .addContext("Sql", sql)
+          .addContext("Column Name", name)
+          .addContext(errorContext)
+          .build(logger));
+        continue;
+      }
+
+      jdbcColumns.add(new JdbcColumn(name, minorType, i));
+      // Precision and scale are passed for all readers whether they are needed or not.
+      builder.addNullable(name, minorType, width, scale);
+    }
+
+    return builder.buildSchema();
+  }
+
+  private void populateWriterArray() {
+    columnWriters = new ArrayList<>();
+
+    for (JdbcColumn col : jdbcColumns) {
+      switch (col.type) {
+        case VARCHAR:
+          columnWriters.add(new JdbcVarcharWriter(col.colName, rowWriter, col.colPosition));
+          break;
+        case FLOAT4:
+          columnWriters.add(new JdbcFloatWriter(col.colName, rowWriter, col.colPosition));
+          break;
+        case FLOAT8:
+          columnWriters.add(new JdbcDoubleWriter(col.colName, rowWriter, col.colPosition));
+          break;
+        case INT:
+          columnWriters.add(new JdbcIntWriter(col.colName, rowWriter, col.colPosition));
+          break;
+        case BIGINT:
+          columnWriters.add(new JdbcBigintWriter(col.colName, rowWriter, col.colPosition));
+          break;
+        case DATE:
+          columnWriters.add(new JdbcDateWriter(col.colName, rowWriter, col.colPosition));
+          break;
+        case TIME:
+          columnWriters.add(new JdbcTimeWriter(col.colName, rowWriter, col.colPosition));
+          break;
+        case TIMESTAMP:
+          columnWriters.add(new JdbcTimestampWriter(col.colName, rowWriter, col.colPosition));
+          break;
+        case VARBINARY:
+          columnWriters.add(new JdbcVarbinaryWriter(col.colName, rowWriter, col.colPosition));
+          break;
+        case BIT:
+          columnWriters.add(new JdbcBitWriter(col.colName, rowWriter, col.colPosition));
+          break;
+        case VARDECIMAL:
+          columnWriters.add(new JdbcVardecimalWriter(col.colName, rowWriter, col.colPosition));
+          break;
+        default:
+          logger.warn("Unsupported data type {} found at column {}", col.type.getDescriptorForType(), col.colName);
+      }
+    }
+  }
+
+  private static String nameFromType(int javaSqlType) {
+    try {
+      for (Field f : java.sql.Types.class.getFields()) {
+        if (java.lang.reflect.Modifier.isStatic(f.getModifiers()) &&
+          f.getType() == int.class &&
+          f.getInt(null) == javaSqlType) {
+          return f.getName();
+        }
+      }
+    } catch (IllegalArgumentException | IllegalAccessException e) {
+      logger.debug("Unable to SQL type {} into String: {}", javaSqlType, e.getMessage());
+    }
+
+    return Integer.toString(javaSqlType);
+  }
+
+  public static class JdbcColumn {
+    final String colName;
+    final MinorType type;
+    final int colPosition;
+
+    public JdbcColumn (String colName, MinorType type, int colPosition) {
+      this.colName = colName;
+      this.type = type;
+      this.colPosition = colPosition;
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
deleted file mode 100644
index a77de8d..0000000
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.jdbc;
-
-import java.lang.reflect.Field;
-import java.math.BigDecimal;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Calendar;
-import java.util.List;
-import java.util.TimeZone;
-
-import javax.sql.DataSource;
-
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.vector.NullableBigIntVector;
-import org.apache.drill.exec.vector.NullableBitVector;
-import org.apache.drill.exec.vector.NullableDateVector;
-import org.apache.drill.exec.vector.NullableFloat4Vector;
-import org.apache.drill.exec.vector.NullableFloat8Vector;
-import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableTimeStampVector;
-import org.apache.drill.exec.vector.NullableTimeVector;
-import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.NullableVarDecimalVector;
-import org.apache.drill.exec.vector.ValueVector;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.drill.exec.planner.types.DrillRelDataTypeSystem.DRILL_REL_DATATYPE_SYSTEM;
-
-class JdbcRecordReader extends AbstractRecordReader {
-
-  private static final Logger logger = LoggerFactory.getLogger(JdbcRecordReader.class);
-
-  private static final ImmutableMap<Integer, MinorType> JDBC_TYPE_MAPPINGS;
-  private final DataSource source;
-  private ResultSet resultSet;
-  private final String storagePluginName;
-  private Connection connection;
-  private PreparedStatement statement;
-  private final String sql;
-  private ImmutableList<ValueVector> vectors;
-  private ImmutableList<Copier<?>> copiers;
-  private final List<SchemaPath> columns;
-
-  public JdbcRecordReader(DataSource source, String sql, String storagePluginName, List<SchemaPath> columns) {
-    this.source = source;
-    this.sql = sql;
-    this.storagePluginName = storagePluginName;
-    this.columns = columns;
-  }
-
-  static {
-    JDBC_TYPE_MAPPINGS = ImmutableMap.<Integer, MinorType>builder()
-        .put(java.sql.Types.DOUBLE, MinorType.FLOAT8)
-        .put(java.sql.Types.FLOAT, MinorType.FLOAT4)
-        .put(java.sql.Types.TINYINT, MinorType.INT)
-        .put(java.sql.Types.SMALLINT, MinorType.INT)
-        .put(java.sql.Types.INTEGER, MinorType.INT)
-        .put(java.sql.Types.BIGINT, MinorType.BIGINT)
-
-        .put(java.sql.Types.CHAR, MinorType.VARCHAR)
-        .put(java.sql.Types.VARCHAR, MinorType.VARCHAR)
-        .put(java.sql.Types.LONGVARCHAR, MinorType.VARCHAR)
-        .put(java.sql.Types.CLOB, MinorType.VARCHAR)
-
-        .put(java.sql.Types.NCHAR, MinorType.VARCHAR)
-        .put(java.sql.Types.NVARCHAR, MinorType.VARCHAR)
-        .put(java.sql.Types.LONGNVARCHAR, MinorType.VARCHAR)
-
-        .put(java.sql.Types.VARBINARY, MinorType.VARBINARY)
-        .put(java.sql.Types.LONGVARBINARY, MinorType.VARBINARY)
-        .put(java.sql.Types.BLOB, MinorType.VARBINARY)
-
-        .put(java.sql.Types.NUMERIC, MinorType.FLOAT8)
-        .put(java.sql.Types.DECIMAL, MinorType.VARDECIMAL)
-        .put(java.sql.Types.REAL, MinorType.FLOAT8)
-
-        .put(java.sql.Types.DATE, MinorType.DATE)
-        .put(java.sql.Types.TIME, MinorType.TIME)
-        .put(java.sql.Types.TIMESTAMP, MinorType.TIMESTAMP)
-
-        .put(java.sql.Types.BOOLEAN, MinorType.BIT)
-
-        .put(java.sql.Types.BIT, MinorType.BIT)
-
-        .build();
-  }
-
-  private static String nameFromType(int javaSqlType) {
-    try {
-      for (Field f : java.sql.Types.class.getFields()) {
-        if (java.lang.reflect.Modifier.isStatic(f.getModifiers()) &&
-            f.getType() == int.class &&
-            f.getInt(null) == javaSqlType) {
-          return f.getName();
-        }
-      }
-    } catch (IllegalArgumentException | IllegalAccessException e) {
-      logger.trace("Unable to SQL type {} into String: {}", javaSqlType, e.getMessage());
-    }
-
-    return Integer.toString(javaSqlType);
-
-  }
-
-  private Copier<?> getCopier(int jdbcType, int offset, ResultSet result, ValueVector v) {
-    switch (jdbcType) {
-      case java.sql.Types.BIGINT:
-        return new BigIntCopier(offset, result, (NullableBigIntVector.Mutator) v.getMutator());
-      case java.sql.Types.FLOAT:
-        return new Float4Copier(offset, result, (NullableFloat4Vector.Mutator) v.getMutator());
-      case java.sql.Types.DOUBLE:
-      case java.sql.Types.NUMERIC:
-      case java.sql.Types.REAL:
-        return new Float8Copier(offset, result, (NullableFloat8Vector.Mutator) v.getMutator());
-      case java.sql.Types.TINYINT:
-      case java.sql.Types.SMALLINT:
-      case java.sql.Types.INTEGER:
-        return new IntCopier(offset, result, (NullableIntVector.Mutator) v.getMutator());
-      case java.sql.Types.CHAR:
-      case java.sql.Types.VARCHAR:
-      case java.sql.Types.LONGVARCHAR:
-      case java.sql.Types.CLOB:
-      case java.sql.Types.NCHAR:
-      case java.sql.Types.NVARCHAR:
-      case java.sql.Types.LONGNVARCHAR:
-        return new VarCharCopier(offset, result, (NullableVarCharVector.Mutator) v.getMutator());
-      case java.sql.Types.VARBINARY:
-      case java.sql.Types.LONGVARBINARY:
-      case java.sql.Types.BLOB:
-        return new VarBinaryCopier(offset, result, (NullableVarBinaryVector.Mutator) v.getMutator());
-      case java.sql.Types.DATE:
-        return new DateCopier(offset, result, (NullableDateVector.Mutator) v.getMutator());
-      case java.sql.Types.TIME:
-        return new TimeCopier(offset, result, (NullableTimeVector.Mutator) v.getMutator());
-      case java.sql.Types.TIMESTAMP:
-        return new TimeStampCopier(offset, result, (NullableTimeStampVector.Mutator) v.getMutator());
-      case java.sql.Types.BOOLEAN:
-      case java.sql.Types.BIT:
-        return new BitCopier(offset, result, (NullableBitVector.Mutator) v.getMutator());
-      case java.sql.Types.DECIMAL:
-        return new DecimalCopier(offset, result, (NullableVarDecimalVector.Mutator) v.getMutator());
-      default:
-        throw new IllegalArgumentException("Unknown how to handle vector.");
-    }
-  }
-
-  @Override
-  public void setup(OperatorContext operatorContext, OutputMutator output) {
-    try {
-      connection = source.getConnection();
-      statement = connection.prepareStatement(sql);
-      resultSet = statement.executeQuery();
-
-      ResultSetMetaData meta = resultSet.getMetaData();
-      int columnsCount = meta.getColumnCount();
-      if (columns.size() != columnsCount) {
-        throw UserException
-            .validationError()
-            .message(
-              "Expected columns count differs from the returned one.\n" +
-                  "Expected columns: %s\n" +
-                  "Returned columns count: %s",
-              columns, columnsCount)
-            .addContext("Sql", sql)
-            .addContext("Plugin", storagePluginName)
-            .build(logger);
-      }
-      ImmutableList.Builder<ValueVector> vectorBuilder = ImmutableList.builder();
-      ImmutableList.Builder<Copier<?>> copierBuilder = ImmutableList.builder();
-
-      for (int i = 1; i <= columnsCount; i++) {
-        String name = columns.get(i - 1).getRootSegmentPath();
-        // column index in ResultSetMetaData starts from 1
-        int jdbcType = meta.getColumnType(i);
-        int width = Math.min(meta.getPrecision(i), DRILL_REL_DATATYPE_SYSTEM.getMaxNumericPrecision());
-        int scale = Math.min(meta.getScale(i), DRILL_REL_DATATYPE_SYSTEM.getMaxNumericScale());
-        MinorType minorType = JDBC_TYPE_MAPPINGS.get(jdbcType);
-        if (minorType == null) {
-          logger.warn("Ignoring column that is unsupported.", UserException
-              .unsupportedError()
-              .message(
-                  "A column you queried has a data type that is not currently supported by the JDBC storage plugin. "
-                      + "The column's name was %s and its JDBC data type was %s. ",
-                  name,
-                  nameFromType(jdbcType))
-              .addContext("Sql", sql)
-              .addContext("Column Name", name)
-              .addContext("Plugin", storagePluginName)
-              .build(logger));
-          continue;
-        }
-
-        MajorType type = MajorType.newBuilder()
-            .setMode(TypeProtos.DataMode.OPTIONAL)
-            .setMinorType(minorType)
-            .setScale(scale)
-            .setPrecision(width)
-            .build();
-        MaterializedField field = MaterializedField.create(name, type);
-        Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass(
-            minorType, type.getMode());
-        ValueVector vector = output.addField(field, clazz);
-        vectorBuilder.add(vector);
-        copierBuilder.add(getCopier(jdbcType, i, resultSet, vector));
-      }
-
-      vectors = vectorBuilder.build();
-      copiers = copierBuilder.build();
-
-    } catch (SQLException | SchemaChangeException e) {
-      throw UserException.dataReadError(e)
-          .message("The JDBC storage plugin failed while trying setup the SQL query. ")
-          .addContext("Sql", sql)
-          .addContext("Plugin", storagePluginName)
-          .build(logger);
-    }
-  }
-
-  @Override
-  public int next() {
-    int counter = 0;
-    try {
-      while (counter < 4095) { // loop at 4095 since nullables use one more than record count and we
-                                            // allocate on powers of two.
-        if (!resultSet.next()) {
-          break;
-        }
-        for (Copier<?> c : copiers) {
-          c.copy(counter);
-        }
-        counter++;
-      }
-    } catch (SQLException e) {
-      throw UserException
-          .dataReadError(e)
-          .message("Failure while attempting to read from database.")
-          .addContext("Sql", sql)
-          .addContext("Plugin", storagePluginName)
-          .build(logger);
-    }
-
-    int valueCount = Math.max(counter, 0);
-    for (ValueVector vv : vectors) {
-      vv.getMutator().setValueCount(valueCount);
-    }
-
-    return valueCount;
-  }
-
-  @Override
-  public void close() {
-    AutoCloseables.closeSilently(resultSet, statement, connection);
-  }
-
-  @Override
-  public String toString() {
-    return "JdbcRecordReader[sql=" + sql
-        + ", Plugin=" + storagePluginName
-        + "]";
-  }
-
-  private abstract static class Copier<T extends ValueVector.Mutator> {
-    final int columnIndex;
-    final ResultSet result;
-    final T mutator;
-
-    Copier(int columnIndex, ResultSet result, T mutator) {
-      this.columnIndex = columnIndex;
-      this.result = result;
-      this.mutator = mutator;
-    }
-
-    abstract void copy(int index) throws SQLException;
-  }
-
-  private static class IntCopier extends Copier<NullableIntVector.Mutator> {
-
-    IntCopier(int offset, ResultSet set, NullableIntVector.Mutator mutator) {
-      super(offset, set, mutator);
-    }
-
-    @Override
-    void copy(int index) throws SQLException {
-      mutator.setSafe(index, result.getInt(columnIndex));
-      if (result.wasNull()) {
-        mutator.setNull(index);
-      }
-    }
-  }
-
-  private static class BigIntCopier extends Copier<NullableBigIntVector.Mutator> {
-
-    BigIntCopier(int offset, ResultSet set, NullableBigIntVector.Mutator mutator) {
-      super(offset, set, mutator);
-    }
-
-    @Override
-    void copy(int index) throws SQLException {
-      mutator.setSafe(index, result.getLong(columnIndex));
-      if (result.wasNull()) {
-        mutator.setNull(index);
-      }
-    }
-  }
-
-  private static class Float4Copier extends Copier<NullableFloat4Vector.Mutator> {
-
-    Float4Copier(int columnIndex, ResultSet result, NullableFloat4Vector.Mutator mutator) {
-      super(columnIndex, result, mutator);
-    }
-
-    @Override
-    void copy(int index) throws SQLException {
-      mutator.setSafe(index, result.getFloat(columnIndex));
-      if (result.wasNull()) {
-        mutator.setNull(index);
-      }
-    }
-  }
-
-  private static class Float8Copier extends Copier<NullableFloat8Vector.Mutator> {
-
-    Float8Copier(int columnIndex, ResultSet result, NullableFloat8Vector.Mutator mutator) {
-      super(columnIndex, result, mutator);
-    }
-
-    @Override
-    void copy(int index) throws SQLException {
-      mutator.setSafe(index, result.getDouble(columnIndex));
-      if (result.wasNull()) {
-        mutator.setNull(index);
-      }
-    }
-  }
-
-  private static class DecimalCopier extends Copier<NullableVarDecimalVector.Mutator> {
-
-    DecimalCopier(int columnIndex, ResultSet result, NullableVarDecimalVector.Mutator mutator) {
-      super(columnIndex, result, mutator);
-    }
-
-    @Override
-    void copy(int index) throws SQLException {
-      BigDecimal decimal = result.getBigDecimal(columnIndex);
-      if (decimal != null) {
-        if (decimal.precision() > DRILL_REL_DATATYPE_SYSTEM.getMaxNumericPrecision()
-            || decimal.scale() > DRILL_REL_DATATYPE_SYSTEM.getMaxNumericScale()) {
-          throw UserException.unsupportedError()
-              .message("Drill doesn't support reading values with precision or scale larger than 38.\n" +
-                  "Please use round() UDF to obtain results with supported precision")
-              .addContext("Column index", index)
-              .build(logger);
-        }
-        mutator.setSafe(index, decimal);
-      }
-    }
-  }
-
-  private static class VarCharCopier extends Copier<NullableVarCharVector.Mutator> {
-
-    VarCharCopier(int columnIndex, ResultSet result, NullableVarCharVector.Mutator mutator) {
-      super(columnIndex, result, mutator);
-    }
-
-    @Override
-    void copy(int index) throws SQLException {
-      String val = result.getString(columnIndex);
-      if (val != null) {
-        byte[] record = val.getBytes(Charsets.UTF_8);
-        mutator.setSafe(index, record, 0, record.length);
-      }
-    }
-  }
-
-  private static class VarBinaryCopier extends Copier<NullableVarBinaryVector.Mutator> {
-
-    VarBinaryCopier(int columnIndex, ResultSet result, NullableVarBinaryVector.Mutator mutator) {
-      super(columnIndex, result, mutator);
-    }
-
-    @Override
-    void copy(int index) throws SQLException {
-      byte[] record = result.getBytes(columnIndex);
-      if (record != null) {
-        mutator.setSafe(index, record, 0, record.length);
-      }
-    }
-  }
-
-  private static class DateCopier extends Copier<NullableDateVector.Mutator> {
-
-    private final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
-
-    DateCopier(int columnIndex, ResultSet result, NullableDateVector.Mutator mutator) {
-      super(columnIndex, result, mutator);
-    }
-
-    @Override
-    void copy(int index) throws SQLException {
-      Date date = result.getDate(columnIndex, calendar);
-      if (date != null) {
-        mutator.setSafe(index, date.getTime());
-      }
-    }
-  }
-
-  private static class TimeCopier extends Copier<NullableTimeVector.Mutator> {
-
-    private final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
-
-    TimeCopier(int columnIndex, ResultSet result, NullableTimeVector.Mutator mutator) {
-      super(columnIndex, result, mutator);
-    }
-
-    @Override
-    void copy(int index) throws SQLException {
-      Time time = result.getTime(columnIndex, calendar);
-      if (time != null) {
-        mutator.setSafe(index, (int) time.getTime());
-      }
-    }
-  }
-
-  private static class TimeStampCopier extends Copier<NullableTimeStampVector.Mutator> {
-
-    private final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
-
-    TimeStampCopier(int columnIndex, ResultSet result, NullableTimeStampVector.Mutator mutator) {
-      super(columnIndex, result, mutator);
-    }
-
-    @Override
-    void copy(int index) throws SQLException {
-      Timestamp stamp = result.getTimestamp(columnIndex, calendar);
-      if (stamp != null) {
-        mutator.setSafe(index, stamp.getTime());
-      }
-    }
-  }
-
-  private static class BitCopier extends Copier<NullableBitVector.Mutator> {
-
-    BitCopier(int columnIndex, ResultSet result, NullableBitVector.Mutator mutator) {
-      super(columnIndex, result, mutator);
-    }
-
-    @Override
-    void copy(int index) throws SQLException {
-      mutator.setSafe(index, result.getBoolean(columnIndex) ? 1 : 0);
-      if (result.wasNull()) {
-        mutator.setNull(index);
-      }
-    }
-  }
-}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcScanBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcScanBatchCreator.java
new file mode 100644
index 0000000..6a727579
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcScanBatchCreator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.scan.framework.BasicScanFactory;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+
+public class JdbcScanBatchCreator implements BatchCreator<JdbcSubScan> {
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
+                                       JdbcSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+
+    try {
+      ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan);
+      return builder.buildScanOperator(context, subScan);
+    } catch (UserException e) {
+      // Rethrow user exceptions directly
+      throw e;
+    } catch (Throwable e) {
+      // Wrap all others
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  private ScanFrameworkBuilder createBuilder(OptionManager options, JdbcSubScan subScan) {
+    JdbcStorageConfig config = subScan.getConfig();
+    ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
+    builder.projection(subScan.getColumns());
+    builder.setUserName(subScan.getUserName());
+    JdbcStoragePlugin plugin = subScan.getPlugin();
+    List<ManagedReader<SchemaNegotiator>> readers =
+      Collections.singletonList(new JdbcBatchReader(plugin.getDataSource(), subScan.getSql(), subScan.getColumns()));
+
+    ManagedScanFramework.ReaderFactory readerFactory = new BasicScanFactory(readers.iterator());
+    builder.setReaderFactory(readerFactory);
+    builder.nullType(Types.optional(MinorType.VARCHAR));
+    return builder;
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
index def31fa..f08f4c1 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.jdbc;
 
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -72,7 +73,7 @@
     return columns;
   }
 
-  public StoragePluginConfig getConfig() {
+  public JdbcStorageConfig getConfig() {
     return plugin.getConfig();
   }
 
@@ -80,4 +81,12 @@
   public JdbcStoragePlugin getPlugin() {
     return plugin;
   }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("sql", sql)
+      .field("columns", columns)
+      .toString();
+  }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBigintWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBigintWriter.java
new file mode 100644
index 0000000..09b7d12
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBigintWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcBigintWriter extends JdbcColumnWriter {
+
+  public JdbcBigintWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+    super(colName, rowWriter, columnIndex);
+  }
+
+  @Override
+  public void load(ResultSet results) throws SQLException {
+    boolean b = results.wasNull();
+    if (! results.wasNull()) {
+      long value = results.getLong(columnIndex);
+      columnWriter.setLong(value);
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBitWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBitWriter.java
new file mode 100644
index 0000000..29d688f
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBitWriter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcBitWriter extends JdbcColumnWriter {
+
+  public JdbcBitWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+    super(colName, rowWriter, columnIndex);
+  }
+
+  @Override
+  public void load(ResultSet results) throws SQLException {
+    if (! results.wasNull()) {
+      boolean value = results.getBoolean(columnIndex);
+      columnWriter.setBoolean(value);
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcColumnWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcColumnWriter.java
new file mode 100644
index 0000000..730828b
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcColumnWriter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public abstract class JdbcColumnWriter {
+
+  final String colName;
+  ScalarWriter columnWriter;
+  RowSetLoader rowWriter;
+  int columnIndex;
+
+  public JdbcColumnWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+    this.colName = colName;
+    this.rowWriter = rowWriter;
+    this.columnWriter = rowWriter.scalar(colName);
+    this.columnIndex = columnIndex;
+  }
+
+  public void load(ResultSet resultSet) throws SQLException {}
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDateWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDateWriter.java
new file mode 100644
index 0000000..b050864
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDateWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.Date;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcDateWriter extends JdbcColumnWriter {
+
+  public JdbcDateWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+    super(colName, rowWriter, columnIndex);
+  }
+
+  @Override
+  public void load(ResultSet results) throws SQLException {
+    Date value = results.getDate(columnIndex);
+    if (value != null) {
+      columnWriter.setDate(value.toLocalDate());
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDoubleWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDoubleWriter.java
new file mode 100644
index 0000000..95d9402
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDoubleWriter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcDoubleWriter extends JdbcColumnWriter {
+
+  public JdbcDoubleWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+    super(colName, rowWriter, columnIndex);
+  }
+
+  @Override
+  public void load(ResultSet results) throws SQLException {
+    if (!results.wasNull()) {
+      double value = results.getDouble(columnIndex);
+      columnWriter.setDouble(value);
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcFloatWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcFloatWriter.java
new file mode 100644
index 0000000..fd2188d
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcFloatWriter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcFloatWriter extends JdbcColumnWriter {
+
+  public JdbcFloatWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+    super(colName, rowWriter, columnIndex);
+  }
+
+  @Override
+  public void load(ResultSet results) throws SQLException {
+    if (!results.wasNull()) {
+      float value = results.getFloat(columnIndex);
+      columnWriter.setFloat(value);
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcIntWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcIntWriter.java
new file mode 100644
index 0000000..5b4704c
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcIntWriter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcIntWriter extends JdbcColumnWriter {
+
+  public JdbcIntWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+    super(colName, rowWriter, columnIndex);
+  }
+
+  @Override
+  public void load(ResultSet results) throws SQLException {
+    if (!results.wasNull()) {
+      int value = results.getInt(columnIndex);
+      columnWriter.setInt(value);
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcTimeWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcTimeWriter.java
new file mode 100644
index 0000000..a989995
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcTimeWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+
+public class JdbcTimeWriter extends JdbcColumnWriter {
+
+  public JdbcTimeWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+    super(colName, rowWriter, columnIndex);
+  }
+
+  @Override
+  public void load(ResultSet results) throws SQLException {
+    Time value = results.getTime(columnIndex);
+    if (value != null) {
+      columnWriter.setTime(value.toLocalTime());
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcTimestampWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcTimestampWriter.java
new file mode 100644
index 0000000..885b0c8
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcTimestampWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+public class JdbcTimestampWriter extends JdbcColumnWriter {
+
+  public JdbcTimestampWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+    super(colName, rowWriter, columnIndex);
+  }
+
+  @Override
+  public void load(ResultSet results) throws SQLException {
+    Timestamp value = results.getTimestamp(columnIndex);
+    if (value != null) {
+      columnWriter.setTimestamp(value.toInstant());
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVarbinaryWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVarbinaryWriter.java
new file mode 100644
index 0000000..24d1c59
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVarbinaryWriter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcVarbinaryWriter extends JdbcColumnWriter {
+
+  public JdbcVarbinaryWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+    super(colName, rowWriter, columnIndex);
+  }
+
+  @Override
+  public void load(ResultSet results) throws SQLException {
+    byte[] value = results.getBytes(columnIndex);
+    if (value != null) {
+      columnWriter.setBytes(value, value.length);
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVarcharWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVarcharWriter.java
new file mode 100644
index 0000000..95e2cd2
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVarcharWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcVarcharWriter extends JdbcColumnWriter {
+
+  public JdbcVarcharWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+    super(colName, rowWriter, columnIndex);
+  }
+
+  @Override
+  public void load(ResultSet results) throws SQLException {
+    String value = results.getString(columnIndex);
+
+    if (Strings.isNullOrEmpty(value)) {
+      columnWriter.setNull();
+    }  else {
+      columnWriter.setString(value);
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVardecimalWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVardecimalWriter.java
new file mode 100644
index 0000000..ecdbaab
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVardecimalWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcVardecimalWriter extends JdbcColumnWriter {
+
+  public JdbcVardecimalWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+    super(colName, rowWriter, columnIndex);
+  }
+
+  @Override
+  public void load(ResultSet results) throws SQLException {
+    BigDecimal value = results.getBigDecimal(columnIndex);
+    if (value != null) {
+      columnWriter.setDecimal(value);
+    }
+  }
+}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
index 07b142c..94b60cf 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
@@ -27,6 +27,7 @@
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.h2.tools.RunScript;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -37,6 +38,7 @@
 import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.util.TimeZone;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -49,10 +51,15 @@
 
   private static final String TABLE_PATH = "jdbcmulti/";
   private static final String TABLE_NAME = String.format("%s.`%s`", StoragePluginTestUtils.DFS_PLUGIN_NAME, TABLE_PATH);
+  private static TimeZone defaultTimeZone;
 
   @BeforeClass
   public static void init() throws Exception {
     startCluster(ClusterFixture.builder(dirTestWatcher));
+    // Force timezone to UTC for these tests.
+    defaultTimeZone = TimeZone.getDefault();
+    TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+
     dirTestWatcher.copyResourceToRoot(Paths.get(TABLE_PATH));
     Class.forName("org.h2.Driver");
     String connString = "jdbc:h2:" + dirTestWatcher.getTmpDir().getCanonicalPath();
@@ -71,6 +78,11 @@
     cluster.defineStoragePlugin("h2o", jdbcStorageConfig);
   }
 
+  @AfterClass
+  public static void cleanUp() {
+    TimeZone.setDefault(defaultTimeZone);
+  }
+
   @Test
   public void testCrossSourceMultiFragmentJoin() throws Exception {
     try {
diff --git a/pom.xml b/pom.xml
index caf3a20..e567330 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,7 +114,7 @@
     <surefire.version>3.0.0-M5</surefire.version>
     <jna.version>5.8.0</jna.version>
     <commons.compress.version>1.20</commons.compress.version>
-    <hikari.version>3.4.2</hikari.version>
+    <hikari.version>4.0.3</hikari.version>
     <netty.version>4.1.59.Final</netty.version>
     <httpclient.version>4.5.13</httpclient.version>
     <libthrift.version>0.14.0</libthrift.version>