blob: ab88326e571df48236a28d783f703b6b9238728a [file] [log] [blame]
package org.apache.hawq.pxf.plugins.jdbc;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.apache.hawq.pxf.api.io.DataType;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.OneField;
import org.apache.hawq.pxf.api.OneRow;
import org.apache.hawq.pxf.api.ReadResolver;
import org.apache.hawq.pxf.api.UserDataException;
import org.apache.hawq.pxf.api.WriteResolver;
import java.util.List;
import java.util.LinkedList;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* JDBC tables resolver
*/
public class JdbcResolver extends JdbcPlugin implements ReadResolver, WriteResolver {
/**
* Class constructor
*/
public JdbcResolver(InputData input) throws UserDataException {
super(input);
}
/**
* getFields() implementation
*
* @throws SQLException if the provided {@link OneRow} object is invalid
*/
@Override
public List<OneField> getFields(OneRow row) throws SQLException {
ResultSet result = (ResultSet) row.getData();
LinkedList<OneField> fields = new LinkedList<>();
for (ColumnDescriptor column : columns) {
String colName = column.columnName();
Object value = null;
OneField oneField = new OneField();
oneField.type = column.columnTypeCode();
switch (DataType.get(oneField.type)) {
case INTEGER:
value = result.getInt(colName);
break;
case FLOAT8:
value = result.getDouble(colName);
break;
case REAL:
value = result.getFloat(colName);
break;
case BIGINT:
value = result.getLong(colName);
break;
case SMALLINT:
value = result.getShort(colName);
break;
case BOOLEAN:
value = result.getBoolean(colName);
break;
case BYTEA:
value = result.getBytes(colName);
break;
case VARCHAR:
case BPCHAR:
case TEXT:
case NUMERIC:
value = result.getString(colName);
break;
case DATE:
value = result.getDate(colName);
break;
case TIMESTAMP:
value = result.getTimestamp(colName);
break;
default:
throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported");
}
oneField.val = value;
fields.add(oneField);
}
return fields;
}
/**
* setFields() implementation
*
* @return OneRow with the data field containing a List<OneField>
* OneFields are not reordered before being passed to Accessor; at the
* moment, there is no way to correct the order of the fields if it is not.
* In practice, the 'record' provided is always ordered the right way.
*
* @throws UnsupportedOperationException if field of some type is not supported
*/
@Override
public OneRow setFields(List<OneField> record) throws UnsupportedOperationException, ParseException {
int column_index = 0;
for (OneField oneField : record) {
ColumnDescriptor column = columns.get(column_index);
if (
LOG.isDebugEnabled() &&
DataType.get(column.columnTypeCode()) != DataType.get(oneField.type)
) {
LOG.warn("The provided tuple of data may be disordered. Datatype of column with descriptor '" + column.toString() + "' must be '" + DataType.get(column.columnTypeCode()).toString() + "', but actual is '" + DataType.get(oneField.type).toString() + "'");
}
// Check that data type is supported
switch (DataType.get(oneField.type)) {
case BOOLEAN:
case INTEGER:
case FLOAT8:
case REAL:
case BIGINT:
case SMALLINT:
case NUMERIC:
case VARCHAR:
case BPCHAR:
case TEXT:
case BYTEA:
case TIMESTAMP:
case DATE:
break;
default:
throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported");
}
if (
LOG.isDebugEnabled() &&
DataType.get(oneField.type) == DataType.BYTEA
) {
LOG.debug("OneField content (conversion from BYTEA): '" + new String((byte[])oneField.val) + "'");
}
// Convert TEXT columns into native data types
if ((DataType.get(oneField.type) == DataType.TEXT) && (DataType.get(column.columnTypeCode()) != DataType.TEXT)) {
oneField.type = column.columnTypeCode();
if (oneField.val != null) {
String rawVal = (String)oneField.val;
if (LOG.isDebugEnabled()) {
LOG.debug("OneField content (conversion from TEXT): '" + rawVal + "'");
}
switch (DataType.get(column.columnTypeCode())) {
case VARCHAR:
case BPCHAR:
case TEXT:
case BYTEA:
break;
case BOOLEAN:
oneField.val = (Object)Boolean.parseBoolean(rawVal);
break;
case INTEGER:
oneField.val = (Object)Integer.parseInt(rawVal);
break;
case FLOAT8:
oneField.val = (Object)Double.parseDouble(rawVal);
break;
case REAL:
oneField.val = (Object)Float.parseFloat(rawVal);
break;
case BIGINT:
oneField.val = (Object)Long.parseLong(rawVal);
break;
case SMALLINT:
oneField.val = (Object)Short.parseShort(rawVal);
break;
case NUMERIC:
oneField.val = (Object)new BigDecimal(rawVal);
break;
case TIMESTAMP:
boolean isConversionSuccessful = false;
for (SimpleDateFormat sdf : timestampSDFs.get()) {
try {
java.util.Date parsedTimestamp = sdf.parse(rawVal);
oneField.val = (Object)new Timestamp(parsedTimestamp.getTime());
isConversionSuccessful = true;
break;
}
catch (ParseException e) {
// pass
}
}
if (!isConversionSuccessful) {
throw new ParseException(rawVal, 0);
}
break;
case DATE:
oneField.val = (Object)new Date(dateSDF.get().parse(rawVal).getTime());
break;
default:
throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported");
}
}
}
column_index += 1;
}
return new OneRow(new LinkedList<OneField>(record));
}
/**
* Decode OneRow object and pass all its contents to a PreparedStatement
*
* @throws IOException if data in a OneRow is corrupted
* @throws SQLException if the given statement is broken
*/
@SuppressWarnings("unchecked")
public static void decodeOneRowToPreparedStatement(OneRow row, PreparedStatement statement) throws IOException, SQLException {
// This is safe: OneRow comes from JdbcResolver
List<OneField> tuple = (List<OneField>)row.getData();
for (int i = 1; i <= tuple.size(); i++) {
OneField field = tuple.get(i - 1);
switch (DataType.get(field.type)) {
case INTEGER:
if (field.val == null) {
statement.setNull(i, Types.INTEGER);
}
else {
statement.setInt(i, (int)field.val);
}
break;
case BIGINT:
if (field.val == null) {
statement.setNull(i, Types.INTEGER);
}
else {
statement.setLong(i, (long)field.val);
}
break;
case SMALLINT:
if (field.val == null) {
statement.setNull(i, Types.INTEGER);
}
else {
statement.setShort(i, (short)field.val);
}
break;
case REAL:
if (field.val == null) {
statement.setNull(i, Types.FLOAT);
}
else {
statement.setFloat(i, (float)field.val);
}
break;
case FLOAT8:
if (field.val == null) {
statement.setNull(i, Types.DOUBLE);
}
else {
statement.setDouble(i, (double)field.val);
}
break;
case BOOLEAN:
if (field.val == null) {
statement.setNull(i, Types.BOOLEAN);
}
else {
statement.setBoolean(i, (boolean)field.val);
}
break;
case NUMERIC:
if (field.val == null) {
statement.setNull(i, Types.NUMERIC);
}
else {
statement.setBigDecimal(i, (BigDecimal)field.val);
}
break;
case VARCHAR:
case BPCHAR:
case TEXT:
if (field.val == null) {
statement.setNull(i, Types.VARCHAR);
}
else {
statement.setString(i, (String)field.val);
}
break;
case BYTEA:
if (field.val == null) {
statement.setNull(i, Types.BINARY);
}
else {
statement.setBytes(i, (byte[])field.val);
}
break;
case TIMESTAMP:
if (field.val == null) {
statement.setNull(i, Types.TIMESTAMP);
}
else {
statement.setTimestamp(i, (Timestamp)field.val);
}
break;
case DATE:
if (field.val == null) {
statement.setNull(i, Types.DATE);
}
else {
statement.setDate(i, (Date)field.val);
}
break;
default:
throw new IOException("The data tuple from JdbcResolver is corrupted");
}
}
}
private static final Log LOG = LogFactory.getLog(JdbcResolver.class);
// SimpleDateFormat to parse TEXT into DATE
private static ThreadLocal<SimpleDateFormat> dateSDF = new ThreadLocal<SimpleDateFormat>() {
@Override protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd");
}
};
// SimpleDateFormat to parse TEXT into TIMESTAMP (with microseconds)
private static ThreadLocal<SimpleDateFormat[]> timestampSDFs = new ThreadLocal<SimpleDateFormat[]>() {
@Override protected SimpleDateFormat[] initialValue() {
SimpleDateFormat[] retRes = {
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS"),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSS"),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSS"),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS"),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S"),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"),
new SimpleDateFormat("yyyy-MM-dd")
};
return retRes;
}
};
}