DRILL-7938: Convert JDBC Storage Plugin to EVF
* Initial Commit
* Most unit tests passing
* Null column issue fixed
* All unit tests passing
* Ignored h2 unit test due to improper timezone support
* Code cleanup
* Removed extra character
* Bump HikariCP to version 4.0.3
* Addressed review comments. Precision and Scale remaining
* Removed unused imports
* Addressed precision and scale comment
* Updated Timezone for h2 tests
* Removed Whitespace
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
deleted file mode 100755
index f5db0b7..0000000
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.jdbc;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.ExecutorFragmentContext;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-
-public class JdbcBatchCreator implements BatchCreator<JdbcSubScan> {
- @Override
- public ScanBatch getBatch(ExecutorFragmentContext context, JdbcSubScan config,
- List<RecordBatch> children) throws ExecutionSetupException {
- Preconditions.checkArgument(children.isEmpty());
- JdbcStoragePlugin plugin = config.getPlugin();
- RecordReader reader = new JdbcRecordReader(plugin.getDataSource(),
- config.getSql(), plugin.getName(), config.getColumns());
- return new ScanBatch(config, context, Collections.singletonList(reader));
- }
-}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchReader.java
new file mode 100644
index 0000000..8a86215
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchReader.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.jdbc.writers.JdbcBigintWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcBitWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcColumnWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcDateWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcDoubleWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcFloatWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcIntWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcTimeWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcTimestampWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcVarbinaryWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcVarcharWriter;
+import org.apache.drill.exec.store.jdbc.writers.JdbcVardecimalWriter;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.drill.exec.planner.types.DrillRelDataTypeSystem.DRILL_REL_DATATYPE_SYSTEM;
+
+public class JdbcBatchReader implements ManagedReader<SchemaNegotiator> {
+
+ private static final Logger logger = LoggerFactory.getLogger(JdbcBatchReader.class);
+ private static final ImmutableMap<Integer, MinorType> JDBC_TYPE_MAPPINGS;
+ private final DataSource source;
+ private final String sql;
+ private final List<SchemaPath> columns;
+ private Connection connection;
+ private PreparedStatement statement;
+ private ResultSet resultSet;
+ private RowSetLoader rowWriter;
+ private CustomErrorContext errorContext;
+ private List<JdbcColumnWriter> columnWriters;
+ private List<JdbcColumn> jdbcColumns;
+
+
+ public JdbcBatchReader(DataSource source, String sql, List<SchemaPath> columns) {
+ this.source = source;
+ this.sql = sql;
+ this.columns = columns;
+ }
+
+ /*
+ * This map maps JDBC data types to their Drill equivalents. The basic strategy is that if there
+ * is a Drill equivalent, then do the mapping as expected. All flavors of INT (SMALLINT, TINYINT etc)
+ * are mapped to INT in Drill, with the exception of BIGINT.
+ *
+ * All flavors of character fields are mapped to VARCHAR in Drill. All versions of binary fields are
+ * mapped to VARBINARY.
+ *
+ */
+ static {
+ JDBC_TYPE_MAPPINGS = ImmutableMap.<Integer, MinorType>builder()
+ .put(java.sql.Types.DOUBLE, MinorType.FLOAT8)
+ .put(java.sql.Types.FLOAT, MinorType.FLOAT4)
+ .put(java.sql.Types.TINYINT, MinorType.INT)
+ .put(java.sql.Types.SMALLINT, MinorType.INT)
+ .put(java.sql.Types.INTEGER, MinorType.INT)
+ .put(java.sql.Types.BIGINT, MinorType.BIGINT)
+
+ .put(java.sql.Types.CHAR, MinorType.VARCHAR)
+ .put(java.sql.Types.VARCHAR, MinorType.VARCHAR)
+ .put(java.sql.Types.LONGVARCHAR, MinorType.VARCHAR)
+ .put(java.sql.Types.CLOB, MinorType.VARCHAR)
+
+ .put(java.sql.Types.NCHAR, MinorType.VARCHAR)
+ .put(java.sql.Types.NVARCHAR, MinorType.VARCHAR)
+ .put(java.sql.Types.LONGNVARCHAR, MinorType.VARCHAR)
+
+ .put(java.sql.Types.VARBINARY, MinorType.VARBINARY)
+ .put(java.sql.Types.LONGVARBINARY, MinorType.VARBINARY)
+ .put(java.sql.Types.BLOB, MinorType.VARBINARY)
+
+ .put(java.sql.Types.NUMERIC, MinorType.FLOAT8)
+ .put(java.sql.Types.DECIMAL, MinorType.VARDECIMAL)
+ .put(java.sql.Types.REAL, MinorType.FLOAT8)
+
+ .put(java.sql.Types.DATE, MinorType.DATE)
+ .put(java.sql.Types.TIME, MinorType.TIME)
+ .put(java.sql.Types.TIMESTAMP, MinorType.TIMESTAMP)
+
+ .put(java.sql.Types.BOOLEAN, MinorType.BIT)
+ .put(java.sql.Types.BIT, MinorType.BIT)
+
+ .build();
+ }
+
+ @Override
+ public boolean open(SchemaNegotiator negotiator) {
+
+ this.errorContext = negotiator.parentErrorContext();
+ try {
+ connection = source.getConnection();
+ statement = connection.prepareStatement(sql);
+ resultSet = statement.executeQuery();
+
+ TupleMetadata drillSchema = buildSchema();
+ negotiator.tableSchema(drillSchema, true);
+ ResultSetLoader resultSetLoader = negotiator.build();
+
+ // Create ScalarWriters
+ rowWriter = resultSetLoader.writer();
+ populateWriterArray();
+
+ } catch (SQLException e) {
+ throw UserException.dataReadError(e)
+ .message("The JDBC storage plugin failed while trying setup the SQL query. ")
+ .addContext("Sql", sql)
+ .addContext(errorContext)
+ .build(logger);
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean next() {
+ while (!rowWriter.isFull()) {
+ if (!processRow()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean processRow() {
+ try {
+ if (!resultSet.next()) {
+ return false;
+ }
+ rowWriter.start();
+ // Process results
+ for (JdbcColumnWriter writer : columnWriters) {
+ writer.load(resultSet);
+ }
+ rowWriter.save();
+ } catch (SQLException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failure while attempting to read from database.")
+ .addContext("Sql", sql)
+ .addContext(errorContext)
+ .build(logger);
+ }
+
+ return true;
+ }
+
+ @Override
+ public void close() {
+ AutoCloseables.closeSilently(resultSet, statement, connection);
+ }
+
+ private TupleMetadata buildSchema() throws SQLException {
+ SchemaBuilder builder = new SchemaBuilder();
+ ResultSetMetaData meta = resultSet.getMetaData();
+ jdbcColumns = new ArrayList<>();
+
+ int columnsCount = meta.getColumnCount();
+
+ if (columns.size() != columnsCount) {
+ throw UserException
+ .validationError()
+ .message(
+ "Expected columns count differs from the returned one.\n" +
+ "Expected columns: %s\n" +
+ "Returned columns count: %s",
+ columns, columnsCount)
+ .addContext("Sql", sql)
+ .addContext(errorContext)
+ .build(logger);
+ }
+
+ for (int i = 1; i <= columnsCount; i++) {
+ String name = columns.get(i - 1).getRootSegmentPath();
+ // column index in ResultSetMetaData starts from 1
+ int jdbcType = meta.getColumnType(i);
+ int width = Math.min(meta.getPrecision(i), DRILL_REL_DATATYPE_SYSTEM.getMaxNumericPrecision());
+ int scale = Math.min(meta.getScale(i), DRILL_REL_DATATYPE_SYSTEM.getMaxNumericScale());
+
+ MinorType minorType = JDBC_TYPE_MAPPINGS.get(jdbcType);
+ if (minorType == null) {
+ logger.warn("Ignoring column that is unsupported.", UserException
+ .unsupportedError()
+ .message(
+ "A column you queried has a data type that is not currently supported by the JDBC storage plugin. "
+ + "The column's name was %s and its JDBC data type was %s. ",
+ name,
+ nameFromType(jdbcType))
+ .addContext("Sql", sql)
+ .addContext("Column Name", name)
+ .addContext(errorContext)
+ .build(logger));
+ continue;
+ }
+
+ jdbcColumns.add(new JdbcColumn(name, minorType, i));
+ // Precision and scale are passed for all readers whether they are needed or not.
+ builder.addNullable(name, minorType, width, scale);
+ }
+
+ return builder.buildSchema();
+ }
+
+ private void populateWriterArray() {
+ columnWriters = new ArrayList<>();
+
+ for (JdbcColumn col : jdbcColumns) {
+ switch (col.type) {
+ case VARCHAR:
+ columnWriters.add(new JdbcVarcharWriter(col.colName, rowWriter, col.colPosition));
+ break;
+ case FLOAT4:
+ columnWriters.add(new JdbcFloatWriter(col.colName, rowWriter, col.colPosition));
+ break;
+ case FLOAT8:
+ columnWriters.add(new JdbcDoubleWriter(col.colName, rowWriter, col.colPosition));
+ break;
+ case INT:
+ columnWriters.add(new JdbcIntWriter(col.colName, rowWriter, col.colPosition));
+ break;
+ case BIGINT:
+ columnWriters.add(new JdbcBigintWriter(col.colName, rowWriter, col.colPosition));
+ break;
+ case DATE:
+ columnWriters.add(new JdbcDateWriter(col.colName, rowWriter, col.colPosition));
+ break;
+ case TIME:
+ columnWriters.add(new JdbcTimeWriter(col.colName, rowWriter, col.colPosition));
+ break;
+ case TIMESTAMP:
+ columnWriters.add(new JdbcTimestampWriter(col.colName, rowWriter, col.colPosition));
+ break;
+ case VARBINARY:
+ columnWriters.add(new JdbcVarbinaryWriter(col.colName, rowWriter, col.colPosition));
+ break;
+ case BIT:
+ columnWriters.add(new JdbcBitWriter(col.colName, rowWriter, col.colPosition));
+ break;
+ case VARDECIMAL:
+ columnWriters.add(new JdbcVardecimalWriter(col.colName, rowWriter, col.colPosition));
+ break;
+ default:
+ logger.warn("Unsupported data type {} found at column {}", col.type.getDescriptorForType(), col.colName);
+ }
+ }
+ }
+
+ private static String nameFromType(int javaSqlType) {
+ try {
+ for (Field f : java.sql.Types.class.getFields()) {
+ if (java.lang.reflect.Modifier.isStatic(f.getModifiers()) &&
+ f.getType() == int.class &&
+ f.getInt(null) == javaSqlType) {
+ return f.getName();
+ }
+ }
+ } catch (IllegalArgumentException | IllegalAccessException e) {
+ logger.debug("Unable to SQL type {} into String: {}", javaSqlType, e.getMessage());
+ }
+
+ return Integer.toString(javaSqlType);
+ }
+
+ public static class JdbcColumn {
+ final String colName;
+ final MinorType type;
+ final int colPosition;
+
+ public JdbcColumn (String colName, MinorType type, int colPosition) {
+ this.colName = colName;
+ this.type = type;
+ this.colPosition = colPosition;
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
deleted file mode 100644
index a77de8d..0000000
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.jdbc;
-
-import java.lang.reflect.Field;
-import java.math.BigDecimal;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Calendar;
-import java.util.List;
-import java.util.TimeZone;
-
-import javax.sql.DataSource;
-
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.vector.NullableBigIntVector;
-import org.apache.drill.exec.vector.NullableBitVector;
-import org.apache.drill.exec.vector.NullableDateVector;
-import org.apache.drill.exec.vector.NullableFloat4Vector;
-import org.apache.drill.exec.vector.NullableFloat8Vector;
-import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableTimeStampVector;
-import org.apache.drill.exec.vector.NullableTimeVector;
-import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.NullableVarDecimalVector;
-import org.apache.drill.exec.vector.ValueVector;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.drill.exec.planner.types.DrillRelDataTypeSystem.DRILL_REL_DATATYPE_SYSTEM;
-
-class JdbcRecordReader extends AbstractRecordReader {
-
- private static final Logger logger = LoggerFactory.getLogger(JdbcRecordReader.class);
-
- private static final ImmutableMap<Integer, MinorType> JDBC_TYPE_MAPPINGS;
- private final DataSource source;
- private ResultSet resultSet;
- private final String storagePluginName;
- private Connection connection;
- private PreparedStatement statement;
- private final String sql;
- private ImmutableList<ValueVector> vectors;
- private ImmutableList<Copier<?>> copiers;
- private final List<SchemaPath> columns;
-
- public JdbcRecordReader(DataSource source, String sql, String storagePluginName, List<SchemaPath> columns) {
- this.source = source;
- this.sql = sql;
- this.storagePluginName = storagePluginName;
- this.columns = columns;
- }
-
- static {
- JDBC_TYPE_MAPPINGS = ImmutableMap.<Integer, MinorType>builder()
- .put(java.sql.Types.DOUBLE, MinorType.FLOAT8)
- .put(java.sql.Types.FLOAT, MinorType.FLOAT4)
- .put(java.sql.Types.TINYINT, MinorType.INT)
- .put(java.sql.Types.SMALLINT, MinorType.INT)
- .put(java.sql.Types.INTEGER, MinorType.INT)
- .put(java.sql.Types.BIGINT, MinorType.BIGINT)
-
- .put(java.sql.Types.CHAR, MinorType.VARCHAR)
- .put(java.sql.Types.VARCHAR, MinorType.VARCHAR)
- .put(java.sql.Types.LONGVARCHAR, MinorType.VARCHAR)
- .put(java.sql.Types.CLOB, MinorType.VARCHAR)
-
- .put(java.sql.Types.NCHAR, MinorType.VARCHAR)
- .put(java.sql.Types.NVARCHAR, MinorType.VARCHAR)
- .put(java.sql.Types.LONGNVARCHAR, MinorType.VARCHAR)
-
- .put(java.sql.Types.VARBINARY, MinorType.VARBINARY)
- .put(java.sql.Types.LONGVARBINARY, MinorType.VARBINARY)
- .put(java.sql.Types.BLOB, MinorType.VARBINARY)
-
- .put(java.sql.Types.NUMERIC, MinorType.FLOAT8)
- .put(java.sql.Types.DECIMAL, MinorType.VARDECIMAL)
- .put(java.sql.Types.REAL, MinorType.FLOAT8)
-
- .put(java.sql.Types.DATE, MinorType.DATE)
- .put(java.sql.Types.TIME, MinorType.TIME)
- .put(java.sql.Types.TIMESTAMP, MinorType.TIMESTAMP)
-
- .put(java.sql.Types.BOOLEAN, MinorType.BIT)
-
- .put(java.sql.Types.BIT, MinorType.BIT)
-
- .build();
- }
-
- private static String nameFromType(int javaSqlType) {
- try {
- for (Field f : java.sql.Types.class.getFields()) {
- if (java.lang.reflect.Modifier.isStatic(f.getModifiers()) &&
- f.getType() == int.class &&
- f.getInt(null) == javaSqlType) {
- return f.getName();
- }
- }
- } catch (IllegalArgumentException | IllegalAccessException e) {
- logger.trace("Unable to SQL type {} into String: {}", javaSqlType, e.getMessage());
- }
-
- return Integer.toString(javaSqlType);
-
- }
-
- private Copier<?> getCopier(int jdbcType, int offset, ResultSet result, ValueVector v) {
- switch (jdbcType) {
- case java.sql.Types.BIGINT:
- return new BigIntCopier(offset, result, (NullableBigIntVector.Mutator) v.getMutator());
- case java.sql.Types.FLOAT:
- return new Float4Copier(offset, result, (NullableFloat4Vector.Mutator) v.getMutator());
- case java.sql.Types.DOUBLE:
- case java.sql.Types.NUMERIC:
- case java.sql.Types.REAL:
- return new Float8Copier(offset, result, (NullableFloat8Vector.Mutator) v.getMutator());
- case java.sql.Types.TINYINT:
- case java.sql.Types.SMALLINT:
- case java.sql.Types.INTEGER:
- return new IntCopier(offset, result, (NullableIntVector.Mutator) v.getMutator());
- case java.sql.Types.CHAR:
- case java.sql.Types.VARCHAR:
- case java.sql.Types.LONGVARCHAR:
- case java.sql.Types.CLOB:
- case java.sql.Types.NCHAR:
- case java.sql.Types.NVARCHAR:
- case java.sql.Types.LONGNVARCHAR:
- return new VarCharCopier(offset, result, (NullableVarCharVector.Mutator) v.getMutator());
- case java.sql.Types.VARBINARY:
- case java.sql.Types.LONGVARBINARY:
- case java.sql.Types.BLOB:
- return new VarBinaryCopier(offset, result, (NullableVarBinaryVector.Mutator) v.getMutator());
- case java.sql.Types.DATE:
- return new DateCopier(offset, result, (NullableDateVector.Mutator) v.getMutator());
- case java.sql.Types.TIME:
- return new TimeCopier(offset, result, (NullableTimeVector.Mutator) v.getMutator());
- case java.sql.Types.TIMESTAMP:
- return new TimeStampCopier(offset, result, (NullableTimeStampVector.Mutator) v.getMutator());
- case java.sql.Types.BOOLEAN:
- case java.sql.Types.BIT:
- return new BitCopier(offset, result, (NullableBitVector.Mutator) v.getMutator());
- case java.sql.Types.DECIMAL:
- return new DecimalCopier(offset, result, (NullableVarDecimalVector.Mutator) v.getMutator());
- default:
- throw new IllegalArgumentException("Unknown how to handle vector.");
- }
- }
-
- @Override
- public void setup(OperatorContext operatorContext, OutputMutator output) {
- try {
- connection = source.getConnection();
- statement = connection.prepareStatement(sql);
- resultSet = statement.executeQuery();
-
- ResultSetMetaData meta = resultSet.getMetaData();
- int columnsCount = meta.getColumnCount();
- if (columns.size() != columnsCount) {
- throw UserException
- .validationError()
- .message(
- "Expected columns count differs from the returned one.\n" +
- "Expected columns: %s\n" +
- "Returned columns count: %s",
- columns, columnsCount)
- .addContext("Sql", sql)
- .addContext("Plugin", storagePluginName)
- .build(logger);
- }
- ImmutableList.Builder<ValueVector> vectorBuilder = ImmutableList.builder();
- ImmutableList.Builder<Copier<?>> copierBuilder = ImmutableList.builder();
-
- for (int i = 1; i <= columnsCount; i++) {
- String name = columns.get(i - 1).getRootSegmentPath();
- // column index in ResultSetMetaData starts from 1
- int jdbcType = meta.getColumnType(i);
- int width = Math.min(meta.getPrecision(i), DRILL_REL_DATATYPE_SYSTEM.getMaxNumericPrecision());
- int scale = Math.min(meta.getScale(i), DRILL_REL_DATATYPE_SYSTEM.getMaxNumericScale());
- MinorType minorType = JDBC_TYPE_MAPPINGS.get(jdbcType);
- if (minorType == null) {
- logger.warn("Ignoring column that is unsupported.", UserException
- .unsupportedError()
- .message(
- "A column you queried has a data type that is not currently supported by the JDBC storage plugin. "
- + "The column's name was %s and its JDBC data type was %s. ",
- name,
- nameFromType(jdbcType))
- .addContext("Sql", sql)
- .addContext("Column Name", name)
- .addContext("Plugin", storagePluginName)
- .build(logger));
- continue;
- }
-
- MajorType type = MajorType.newBuilder()
- .setMode(TypeProtos.DataMode.OPTIONAL)
- .setMinorType(minorType)
- .setScale(scale)
- .setPrecision(width)
- .build();
- MaterializedField field = MaterializedField.create(name, type);
- Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass(
- minorType, type.getMode());
- ValueVector vector = output.addField(field, clazz);
- vectorBuilder.add(vector);
- copierBuilder.add(getCopier(jdbcType, i, resultSet, vector));
- }
-
- vectors = vectorBuilder.build();
- copiers = copierBuilder.build();
-
- } catch (SQLException | SchemaChangeException e) {
- throw UserException.dataReadError(e)
- .message("The JDBC storage plugin failed while trying setup the SQL query. ")
- .addContext("Sql", sql)
- .addContext("Plugin", storagePluginName)
- .build(logger);
- }
- }
-
- @Override
- public int next() {
- int counter = 0;
- try {
- while (counter < 4095) { // loop at 4095 since nullables use one more than record count and we
- // allocate on powers of two.
- if (!resultSet.next()) {
- break;
- }
- for (Copier<?> c : copiers) {
- c.copy(counter);
- }
- counter++;
- }
- } catch (SQLException e) {
- throw UserException
- .dataReadError(e)
- .message("Failure while attempting to read from database.")
- .addContext("Sql", sql)
- .addContext("Plugin", storagePluginName)
- .build(logger);
- }
-
- int valueCount = Math.max(counter, 0);
- for (ValueVector vv : vectors) {
- vv.getMutator().setValueCount(valueCount);
- }
-
- return valueCount;
- }
-
- @Override
- public void close() {
- AutoCloseables.closeSilently(resultSet, statement, connection);
- }
-
- @Override
- public String toString() {
- return "JdbcRecordReader[sql=" + sql
- + ", Plugin=" + storagePluginName
- + "]";
- }
-
- private abstract static class Copier<T extends ValueVector.Mutator> {
- final int columnIndex;
- final ResultSet result;
- final T mutator;
-
- Copier(int columnIndex, ResultSet result, T mutator) {
- this.columnIndex = columnIndex;
- this.result = result;
- this.mutator = mutator;
- }
-
- abstract void copy(int index) throws SQLException;
- }
-
- private static class IntCopier extends Copier<NullableIntVector.Mutator> {
-
- IntCopier(int offset, ResultSet set, NullableIntVector.Mutator mutator) {
- super(offset, set, mutator);
- }
-
- @Override
- void copy(int index) throws SQLException {
- mutator.setSafe(index, result.getInt(columnIndex));
- if (result.wasNull()) {
- mutator.setNull(index);
- }
- }
- }
-
- private static class BigIntCopier extends Copier<NullableBigIntVector.Mutator> {
-
- BigIntCopier(int offset, ResultSet set, NullableBigIntVector.Mutator mutator) {
- super(offset, set, mutator);
- }
-
- @Override
- void copy(int index) throws SQLException {
- mutator.setSafe(index, result.getLong(columnIndex));
- if (result.wasNull()) {
- mutator.setNull(index);
- }
- }
- }
-
- private static class Float4Copier extends Copier<NullableFloat4Vector.Mutator> {
-
- Float4Copier(int columnIndex, ResultSet result, NullableFloat4Vector.Mutator mutator) {
- super(columnIndex, result, mutator);
- }
-
- @Override
- void copy(int index) throws SQLException {
- mutator.setSafe(index, result.getFloat(columnIndex));
- if (result.wasNull()) {
- mutator.setNull(index);
- }
- }
- }
-
- private static class Float8Copier extends Copier<NullableFloat8Vector.Mutator> {
-
- Float8Copier(int columnIndex, ResultSet result, NullableFloat8Vector.Mutator mutator) {
- super(columnIndex, result, mutator);
- }
-
- @Override
- void copy(int index) throws SQLException {
- mutator.setSafe(index, result.getDouble(columnIndex));
- if (result.wasNull()) {
- mutator.setNull(index);
- }
- }
- }
-
- private static class DecimalCopier extends Copier<NullableVarDecimalVector.Mutator> {
-
- DecimalCopier(int columnIndex, ResultSet result, NullableVarDecimalVector.Mutator mutator) {
- super(columnIndex, result, mutator);
- }
-
- @Override
- void copy(int index) throws SQLException {
- BigDecimal decimal = result.getBigDecimal(columnIndex);
- if (decimal != null) {
- if (decimal.precision() > DRILL_REL_DATATYPE_SYSTEM.getMaxNumericPrecision()
- || decimal.scale() > DRILL_REL_DATATYPE_SYSTEM.getMaxNumericScale()) {
- throw UserException.unsupportedError()
- .message("Drill doesn't support reading values with precision or scale larger than 38.\n" +
- "Please use round() UDF to obtain results with supported precision")
- .addContext("Column index", index)
- .build(logger);
- }
- mutator.setSafe(index, decimal);
- }
- }
- }
-
- private static class VarCharCopier extends Copier<NullableVarCharVector.Mutator> {
-
- VarCharCopier(int columnIndex, ResultSet result, NullableVarCharVector.Mutator mutator) {
- super(columnIndex, result, mutator);
- }
-
- @Override
- void copy(int index) throws SQLException {
- String val = result.getString(columnIndex);
- if (val != null) {
- byte[] record = val.getBytes(Charsets.UTF_8);
- mutator.setSafe(index, record, 0, record.length);
- }
- }
- }
-
- private static class VarBinaryCopier extends Copier<NullableVarBinaryVector.Mutator> {
-
- VarBinaryCopier(int columnIndex, ResultSet result, NullableVarBinaryVector.Mutator mutator) {
- super(columnIndex, result, mutator);
- }
-
- @Override
- void copy(int index) throws SQLException {
- byte[] record = result.getBytes(columnIndex);
- if (record != null) {
- mutator.setSafe(index, record, 0, record.length);
- }
- }
- }
-
- private static class DateCopier extends Copier<NullableDateVector.Mutator> {
-
- private final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
-
- DateCopier(int columnIndex, ResultSet result, NullableDateVector.Mutator mutator) {
- super(columnIndex, result, mutator);
- }
-
- @Override
- void copy(int index) throws SQLException {
- Date date = result.getDate(columnIndex, calendar);
- if (date != null) {
- mutator.setSafe(index, date.getTime());
- }
- }
- }
-
- private static class TimeCopier extends Copier<NullableTimeVector.Mutator> {
-
- private final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
-
- TimeCopier(int columnIndex, ResultSet result, NullableTimeVector.Mutator mutator) {
- super(columnIndex, result, mutator);
- }
-
- @Override
- void copy(int index) throws SQLException {
- Time time = result.getTime(columnIndex, calendar);
- if (time != null) {
- mutator.setSafe(index, (int) time.getTime());
- }
- }
- }
-
- private static class TimeStampCopier extends Copier<NullableTimeStampVector.Mutator> {
-
- private final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
-
- TimeStampCopier(int columnIndex, ResultSet result, NullableTimeStampVector.Mutator mutator) {
- super(columnIndex, result, mutator);
- }
-
- @Override
- void copy(int index) throws SQLException {
- Timestamp stamp = result.getTimestamp(columnIndex, calendar);
- if (stamp != null) {
- mutator.setSafe(index, stamp.getTime());
- }
- }
- }
-
- private static class BitCopier extends Copier<NullableBitVector.Mutator> {
-
- BitCopier(int columnIndex, ResultSet result, NullableBitVector.Mutator mutator) {
- super(columnIndex, result, mutator);
- }
-
- @Override
- void copy(int index) throws SQLException {
- mutator.setSafe(index, result.getBoolean(columnIndex) ? 1 : 0);
- if (result.wasNull()) {
- mutator.setNull(index);
- }
- }
- }
-}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcScanBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcScanBatchCreator.java
new file mode 100644
index 0000000..6a727579
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcScanBatchCreator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.scan.framework.BasicScanFactory;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+
+public class JdbcScanBatchCreator implements BatchCreator<JdbcSubScan> {
+
+ @Override
+ public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
+ JdbcSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+
+ try {
+ ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan);
+ return builder.buildScanOperator(context, subScan);
+ } catch (UserException e) {
+ // Rethrow user exceptions directly
+ throw e;
+ } catch (Throwable e) {
+ // Wrap all others
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ private ScanFrameworkBuilder createBuilder(OptionManager options, JdbcSubScan subScan) {
+ JdbcStorageConfig config = subScan.getConfig();
+ ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
+ builder.projection(subScan.getColumns());
+ builder.setUserName(subScan.getUserName());
+ JdbcStoragePlugin plugin = subScan.getPlugin();
+ List<ManagedReader<SchemaNegotiator>> readers =
+ Collections.singletonList(new JdbcBatchReader(plugin.getDataSource(), subScan.getSql(), subScan.getColumns()));
+
+ ManagedScanFramework.ReaderFactory readerFactory = new BasicScanFactory(readers.iterator());
+ builder.setReaderFactory(readerFactory);
+ builder.nullType(Types.optional(MinorType.VARCHAR));
+ return builder;
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
index def31fa..f08f4c1 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.jdbc;
+import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
@@ -72,7 +73,7 @@
return columns;
}
- public StoragePluginConfig getConfig() {
+ public JdbcStorageConfig getConfig() {
return plugin.getConfig();
}
@@ -80,4 +81,12 @@
public JdbcStoragePlugin getPlugin() {
return plugin;
}
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("sql", sql)
+ .field("columns", columns)
+ .toString();
+ }
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBigintWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBigintWriter.java
new file mode 100644
index 0000000..09b7d12
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBigintWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcBigintWriter extends JdbcColumnWriter {
+
+ public JdbcBigintWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+ super(colName, rowWriter, columnIndex);
+ }
+
+ @Override
+ public void load(ResultSet results) throws SQLException {
+ boolean b = results.wasNull();
+ if (! results.wasNull()) {
+ long value = results.getLong(columnIndex);
+ columnWriter.setLong(value);
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBitWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBitWriter.java
new file mode 100644
index 0000000..29d688f
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcBitWriter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcBitWriter extends JdbcColumnWriter {
+
+ public JdbcBitWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+ super(colName, rowWriter, columnIndex);
+ }
+
+ @Override
+ public void load(ResultSet results) throws SQLException {
+ if (! results.wasNull()) {
+ boolean value = results.getBoolean(columnIndex);
+ columnWriter.setBoolean(value);
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcColumnWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcColumnWriter.java
new file mode 100644
index 0000000..730828b
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcColumnWriter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public abstract class JdbcColumnWriter {
+
+ final String colName;
+ ScalarWriter columnWriter;
+ RowSetLoader rowWriter;
+ int columnIndex;
+
+ public JdbcColumnWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+ this.colName = colName;
+ this.rowWriter = rowWriter;
+ this.columnWriter = rowWriter.scalar(colName);
+ this.columnIndex = columnIndex;
+ }
+
+ public void load(ResultSet resultSet) throws SQLException {}
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDateWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDateWriter.java
new file mode 100644
index 0000000..b050864
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDateWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.Date;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcDateWriter extends JdbcColumnWriter {
+
+ public JdbcDateWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+ super(colName, rowWriter, columnIndex);
+ }
+
+ @Override
+ public void load(ResultSet results) throws SQLException {
+ Date value = results.getDate(columnIndex);
+ if (value != null) {
+ columnWriter.setDate(value.toLocalDate());
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDoubleWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDoubleWriter.java
new file mode 100644
index 0000000..95d9402
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcDoubleWriter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcDoubleWriter extends JdbcColumnWriter {
+
+ public JdbcDoubleWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+ super(colName, rowWriter, columnIndex);
+ }
+
+ @Override
+ public void load(ResultSet results) throws SQLException {
+ if (!results.wasNull()) {
+ double value = results.getDouble(columnIndex);
+ columnWriter.setDouble(value);
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcFloatWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcFloatWriter.java
new file mode 100644
index 0000000..fd2188d
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcFloatWriter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcFloatWriter extends JdbcColumnWriter {
+
+ public JdbcFloatWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+ super(colName, rowWriter, columnIndex);
+ }
+
+ @Override
+ public void load(ResultSet results) throws SQLException {
+ if (!results.wasNull()) {
+ float value = results.getFloat(columnIndex);
+ columnWriter.setFloat(value);
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcIntWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcIntWriter.java
new file mode 100644
index 0000000..5b4704c
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcIntWriter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcIntWriter extends JdbcColumnWriter {
+
+ public JdbcIntWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+ super(colName, rowWriter, columnIndex);
+ }
+
+ @Override
+ public void load(ResultSet results) throws SQLException {
+ if (!results.wasNull()) {
+ int value = results.getInt(columnIndex);
+ columnWriter.setInt(value);
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcTimeWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcTimeWriter.java
new file mode 100644
index 0000000..a989995
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcTimeWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+
+public class JdbcTimeWriter extends JdbcColumnWriter {
+
+ public JdbcTimeWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+ super(colName, rowWriter, columnIndex);
+ }
+
+ @Override
+ public void load(ResultSet results) throws SQLException {
+ Time value = results.getTime(columnIndex);
+ if (value != null) {
+ columnWriter.setTime(value.toLocalTime());
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcTimestampWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcTimestampWriter.java
new file mode 100644
index 0000000..885b0c8
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcTimestampWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+public class JdbcTimestampWriter extends JdbcColumnWriter {
+
+ public JdbcTimestampWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+ super(colName, rowWriter, columnIndex);
+ }
+
+ @Override
+ public void load(ResultSet results) throws SQLException {
+ Timestamp value = results.getTimestamp(columnIndex);
+ if (value != null) {
+ columnWriter.setTimestamp(value.toInstant());
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVarbinaryWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVarbinaryWriter.java
new file mode 100644
index 0000000..24d1c59
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVarbinaryWriter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcVarbinaryWriter extends JdbcColumnWriter {
+
+ public JdbcVarbinaryWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+ super(colName, rowWriter, columnIndex);
+ }
+
+ @Override
+ public void load(ResultSet results) throws SQLException {
+ byte[] value = results.getBytes(columnIndex);
+ if (value != null) {
+ columnWriter.setBytes(value, value.length);
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVarcharWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVarcharWriter.java
new file mode 100644
index 0000000..95e2cd2
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVarcharWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcVarcharWriter extends JdbcColumnWriter {
+
+ public JdbcVarcharWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+ super(colName, rowWriter, columnIndex);
+ }
+
+ @Override
+ public void load(ResultSet results) throws SQLException {
+ String value = results.getString(columnIndex);
+
+ if (Strings.isNullOrEmpty(value)) {
+ columnWriter.setNull();
+ } else {
+ columnWriter.setString(value);
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVardecimalWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVardecimalWriter.java
new file mode 100644
index 0000000..ecdbaab
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/writers/JdbcVardecimalWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.writers;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JdbcVardecimalWriter extends JdbcColumnWriter {
+
+ public JdbcVardecimalWriter(String colName, RowSetLoader rowWriter, int columnIndex) {
+ super(colName, rowWriter, columnIndex);
+ }
+
+ @Override
+ public void load(ResultSet results) throws SQLException {
+ BigDecimal value = results.getBigDecimal(columnIndex);
+ if (value != null) {
+ columnWriter.setDecimal(value);
+ }
+ }
+}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
index 07b142c..94b60cf 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
@@ -27,6 +27,7 @@
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
import org.h2.tools.RunScript;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -37,6 +38,7 @@
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.util.TimeZone;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -49,10 +51,15 @@
private static final String TABLE_PATH = "jdbcmulti/";
private static final String TABLE_NAME = String.format("%s.`%s`", StoragePluginTestUtils.DFS_PLUGIN_NAME, TABLE_PATH);
+ private static TimeZone defaultTimeZone;
@BeforeClass
public static void init() throws Exception {
startCluster(ClusterFixture.builder(dirTestWatcher));
+ // Force timezone to UTC for these tests.
+ defaultTimeZone = TimeZone.getDefault();
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+
dirTestWatcher.copyResourceToRoot(Paths.get(TABLE_PATH));
Class.forName("org.h2.Driver");
String connString = "jdbc:h2:" + dirTestWatcher.getTmpDir().getCanonicalPath();
@@ -71,6 +78,11 @@
cluster.defineStoragePlugin("h2o", jdbcStorageConfig);
}
+ @AfterClass
+ public static void cleanUp() {
+ TimeZone.setDefault(defaultTimeZone);
+ }
+
@Test
public void testCrossSourceMultiFragmentJoin() throws Exception {
try {
diff --git a/pom.xml b/pom.xml
index caf3a20..e567330 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,7 +114,7 @@
<surefire.version>3.0.0-M5</surefire.version>
<jna.version>5.8.0</jna.version>
<commons.compress.version>1.20</commons.compress.version>
- <hikari.version>3.4.2</hikari.version>
+ <hikari.version>4.0.3</hikari.version>
<netty.version>4.1.59.Final</netty.version>
<httpclient.version>4.5.13</httpclient.version>
<libthrift.version>0.14.0</libthrift.version>