blob: ebe907392a52a26397080249408de879f8e4cd59 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.jdbc;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.DateString;
import org.apache.calcite.util.TimeString;
import org.apache.calcite.util.TimestampString;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.store.AbstractRecordWriter;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.store.jdbc.utils.CreateTableStmtBuilder;
import org.apache.drill.exec.store.jdbc.utils.InsertStatementBuilder;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class JdbcRecordWriter extends AbstractRecordWriter {
private static final Logger logger = LoggerFactory.getLogger(JdbcRecordWriter.class);
private final List<String> tableIdentifier;
private final Connection connection;
protected final SqlDialect dialect;
private final InsertStatementBuilder insertStatementBuilder;
private final JdbcWriter config;
private int recordCount;
public JdbcRecordWriter(DataSource source, List<String> tableIdentifier, JdbcWriter config) {
this.tableIdentifier = tableIdentifier;
this.dialect = config.getPlugin().getDialect(source);
this.config = config;
this.recordCount = 0;
this.insertStatementBuilder = getInsertStatementBuilder(tableIdentifier);
try {
this.connection = source.getConnection();
} catch (SQLException e) {
throw UserException.connectionError()
.message("Unable to open JDBC connection for writing.")
.addContext(e.getSQLState())
.build(logger);
}
}
protected InsertStatementBuilder getInsertStatementBuilder(List<String> tableIdentifier) {
return new InsertStatementBuilder(tableIdentifier, dialect);
}
@Override
public void init(Map<String, String> writerOptions) {
// Nothing to see here...
}
@Override
public void updateSchema(VectorAccessible batch) {
BatchSchema schema = batch.getSchema();
CreateTableStmtBuilder queryBuilder = new CreateTableStmtBuilder(tableIdentifier, dialect);
for (MaterializedField field : schema) {
logger.debug("Adding column {} of type {}.", field.getName(), field.getType().getMinorType());
if (field.getType().getMode() == DataMode.REPEATED) {
throw UserException.dataWriteError()
.message("Drill does not yet support writing arrays to JDBC. " + field.getName() + " is an array.")
.build(logger);
}
queryBuilder.addColumn(field);
}
String sql = queryBuilder.build();
logger.debug("Final query: {}", sql);
// Execute the query to build the schema
try (Statement statement = connection.createStatement()) {
logger.debug("Executing CREATE query: {}", sql);
statement.execute(sql);
} catch (SQLException e) {
throw UserException.dataReadError(e)
.message("The JDBC storage plugin failed while trying to create the schema. ")
.addContext("Sql", sql)
.build(logger);
}
}
@Override
public void startRecord() {
insertStatementBuilder.resetRow();
logger.debug("Start record");
}
@Override
public void endRecord() throws IOException {
logger.debug("Ending record");
insertStatementBuilder.endRecord();
recordCount++;
if (recordCount >= config.getPlugin().getConfig().getWriterBatchSize()) {
executeInsert(insertStatementBuilder.buildInsertQuery());
// Reset the batch
recordCount = 0;
}
insertStatementBuilder.resetRow();
}
@Override
public void abort() {
logger.debug("Abort insert.");
}
@Override
public void cleanup() throws IOException {
logger.debug("Cleanup record");
if (recordCount != 0) {
executeInsert(insertStatementBuilder.buildInsertQuery());
}
AutoCloseables.closeSilently(connection);
}
private void executeInsert(String insertQuery) throws IOException {
try (Statement stmt = connection.createStatement()) {
logger.debug("Executing insert query: {}", insertQuery);
stmt.execute(insertQuery);
logger.debug("Query complete");
// Close connection
AutoCloseables.closeSilently(stmt);
} catch (SQLException e) {
logger.error("Error: {} {} {}", e.getMessage(), e.getSQLState(), e.getErrorCode());
AutoCloseables.closeSilently(connection);
throw new IOException(e.getMessage() + " " + e.getSQLState() + "\n" + insertQuery);
}
}
public class NullableJdbcConverter extends FieldConverter {
private final FieldConverter delegate;
public NullableJdbcConverter(int fieldId, String fieldName, FieldReader reader, FieldConverter delegate) {
super(fieldId, fieldName, reader);
this.delegate = delegate;
}
@Override
public void writeField() throws IOException {
if (reader.isSet()) {
delegate.writeField();
} else {
insertStatementBuilder.addRowValue(SqlLiteral.createNull(SqlParserPos.ZERO));
}
}
}
public class ExactNumericJdbcConverter extends FieldConverter {
public ExactNumericJdbcConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
}
@Override
public void writeField() {
insertStatementBuilder.addRowValue(
SqlLiteral.createExactNumeric(String.valueOf(reader.readObject()),
SqlParserPos.ZERO));
}
}
public class ApproxNumericJdbcConverter extends FieldConverter {
public ApproxNumericJdbcConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
}
@Override
public void writeField() {
insertStatementBuilder.addRowValue(
SqlLiteral.createApproxNumeric(String.valueOf(reader.readObject()),
SqlParserPos.ZERO));
}
}
@Override
public FieldConverter getNewNullableIntConverter(int fieldId, String fieldName, FieldReader reader) {
return new NullableJdbcConverter(fieldId, fieldName, reader,
new ExactNumericJdbcConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewIntConverter(int fieldId, String fieldName, FieldReader reader) {
return new ExactNumericJdbcConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewNullableBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
return new NullableJdbcConverter(fieldId, fieldName, reader,
new ExactNumericJdbcConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
return new ExactNumericJdbcConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewNullableSmallIntConverter(int fieldId, String fieldName, FieldReader reader) {
return new NullableJdbcConverter(fieldId, fieldName, reader,
new ExactNumericJdbcConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewSmallIntConverter(int fieldId, String fieldName, FieldReader reader) {
return new ExactNumericJdbcConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewNullableTinyIntConverter(int fieldId, String fieldName, FieldReader reader) {
return new NullableJdbcConverter(fieldId, fieldName, reader,
new ExactNumericJdbcConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewTinyIntConverter(int fieldId, String fieldName, FieldReader reader) {
return new ExactNumericJdbcConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewNullableFloat4Converter(int fieldId, String fieldName, FieldReader reader) {
return new NullableJdbcConverter(fieldId, fieldName, reader,
new ApproxNumericJdbcConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewFloat4Converter(int fieldId, String fieldName, FieldReader reader) {
return new ApproxNumericJdbcConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewNullableFloat8Converter(int fieldId, String fieldName, FieldReader reader) {
return new NullableJdbcConverter(fieldId, fieldName, reader,
new ApproxNumericJdbcConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewFloat8Converter(int fieldId, String fieldName, FieldReader reader) {
return new ApproxNumericJdbcConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewNullableVarDecimalConverter(int fieldId, String fieldName, FieldReader reader) {
return new NullableJdbcConverter(fieldId, fieldName, reader,
new ExactNumericJdbcConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewVarDecimalConverter(int fieldId, String fieldName, FieldReader reader) {
return new ExactNumericJdbcConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewNullableVarCharConverter(int fieldId, String fieldName, FieldReader reader) {
return new NullableJdbcConverter(fieldId, fieldName, reader,
new VarCharJDBCConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewVarCharConverter(int fieldId, String fieldName, FieldReader reader) {
return new VarCharJDBCConverter(fieldId, fieldName, reader);
}
public class VarCharJDBCConverter extends FieldConverter {
public VarCharJDBCConverter(int fieldID, String fieldName, FieldReader reader) {
super(fieldID, fieldName, reader);
}
@Override
public void writeField() {
byte[] bytes = reader.readText().copyBytes();
insertStatementBuilder.addRowValue(
SqlLiteral.createCharString(new String(bytes),
SqlParserPos.ZERO));
}
}
@Override
public FieldConverter getNewNullableDateConverter(int fieldId, String fieldName, FieldReader reader) {
return new NullableJdbcConverter(fieldId, fieldName, reader,
new DateJDBCConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewDateConverter(int fieldId, String fieldName, FieldReader reader) {
return new DateJDBCConverter(fieldId, fieldName, reader);
}
public class DateJDBCConverter extends FieldConverter {
public DateJDBCConverter(int fieldID, String fieldName, FieldReader reader) {
super(fieldID, fieldName, reader);
}
@Override
public void writeField() {
insertStatementBuilder.addRowValue(
SqlLiteral.createDate(DateString.fromDaysSinceEpoch((int) reader.readLocalDate().toEpochDay()),
SqlParserPos.ZERO));
}
}
@Override
public FieldConverter getNewNullableTimeConverter(int fieldId, String fieldName, FieldReader reader) {
return new NullableJdbcConverter(fieldId, fieldName, reader,
new TimeJDBCConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewTimeConverter(int fieldId, String fieldName, FieldReader reader) {
return new TimeJDBCConverter(fieldId, fieldName, reader);
}
public class TimeJDBCConverter extends FieldConverter {
public TimeJDBCConverter(int fieldID, String fieldName, FieldReader reader) {
super(fieldID, fieldName, reader);
}
@Override
public void writeField() {
insertStatementBuilder.addRowValue(
SqlLiteral.createTime(TimeString.fromMillisOfDay(
(int) (reader.readLocalTime().toNanoOfDay() / TimeUnit.MILLISECONDS.toNanos(1))),
Types.DEFAULT_TIMESTAMP_PRECISION,
SqlParserPos.ZERO));
}
}
@Override
public FieldConverter getNewNullableTimeStampConverter(int fieldId, String fieldName, FieldReader reader) {
return new NullableJdbcConverter(fieldId, fieldName, reader,
new TimeStampJDBCConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewTimeStampConverter(int fieldId, String fieldName, FieldReader reader) {
return new TimeStampJDBCConverter(fieldId, fieldName, reader);
}
public class TimeStampJDBCConverter extends FieldConverter {
public TimeStampJDBCConverter(int fieldID, String fieldName, FieldReader reader) {
super(fieldID, fieldName, reader);
}
@Override
public void writeField() {
insertStatementBuilder.addRowValue(
SqlLiteral.createTimestamp(
TimestampString.fromMillisSinceEpoch(reader.readLocalDateTime().toInstant(ZoneOffset.UTC).toEpochMilli()),
Types.DEFAULT_TIMESTAMP_PRECISION,
SqlParserPos.ZERO));
}
}
@Override
public FieldConverter getNewNullableBitConverter(int fieldId, String fieldName, FieldReader reader) {
return new NullableJdbcConverter(fieldId, fieldName, reader,
new BitJDBCConverter(fieldId, fieldName, reader));
}
@Override
public FieldConverter getNewBitConverter(int fieldId, String fieldName, FieldReader reader) {
return new BitJDBCConverter(fieldId, fieldName, reader);
}
public class BitJDBCConverter extends FieldConverter {
public BitJDBCConverter(int fieldID, String fieldName, FieldReader reader) {
super(fieldID, fieldName, reader);
}
@Override
public void writeField() {
insertStatementBuilder.addRowValue(SqlLiteral.createBoolean(reader.readBoolean(), SqlParserPos.ZERO));
}
}
}