blob: 76ed3ab3ddb63c9f1d42506d1a0970cacb17ccca [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage.jdbc;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.TimeDatum;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.exception.UnsupportedDataTypeException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.storage.Scanner;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.util.datetime.DateTimeUtil;
import java.io.Closeable;
import java.io.IOException;
import java.sql.*;
import java.util.Iterator;
import java.util.Properties;
public abstract class JdbcScanner implements Scanner {
private static final Log LOG = LogFactory.getLog(JdbcScanner.class);
protected final DatabaseMetaData dbMetaData;
/** JDBC Connection Properties */
protected final Properties connProperties;
protected final String tableName;
protected final Schema schema;
protected final TableMeta tableMeta;
protected final JdbcFragment fragment;
protected final TableStats stats;
protected final SQLBuilder builder;
protected Column [] targets;
protected EvalNode filter;
protected Long limit;
protected LogicalNode planPart;
protected VTuple outTuple;
protected String generatedSql;
protected ResultSetIterator iter;
protected int recordCount = 0;
/**
*
* @param dbMetaData DatabaseMetaData
* @param connProperties JDBC Connection Properties
* @param tableSchema Table Schema
* @param tableMeta Table Properties
* @param fragment Fragment
*/
public JdbcScanner(final DatabaseMetaData dbMetaData,
final Properties connProperties,
final Schema tableSchema,
final TableMeta tableMeta,
final JdbcFragment fragment) {
Preconditions.checkNotNull(dbMetaData);
Preconditions.checkNotNull(connProperties);
Preconditions.checkNotNull(tableSchema);
Preconditions.checkNotNull(tableMeta);
Preconditions.checkNotNull(fragment);
this.dbMetaData = dbMetaData;
this.connProperties = connProperties;
this.tableName = ConnectionInfo.fromURI(fragment.getUri()).tableName;
this.schema = tableSchema;
this.tableMeta = tableMeta;
this.fragment = fragment;
this.stats = new TableStats();
builder = getSQLBuilder();
}
@Override
public void init() throws IOException {
if (targets == null) {
targets = schema.toArray();
}
outTuple = new VTuple(targets.length);
if (planPart == null) {
generatedSql = builder.build(tableName, targets, filter, limit);
} else {
generatedSql = builder.build(planPart);
}
}
@Override
public Tuple next() throws IOException {
if (iter == null) {
iter = executeQueryAndGetIter();
}
if (iter.hasNext()) {
return iter.next();
} else {
return null;
}
}
@Override
public void reset() throws IOException {
if (iter != null) {
iter.rewind();
}
}
@Override
public void close() throws IOException {
if (iter != null) {
iter.close();
}
}
@Override
public void pushOperators(LogicalNode planPart) {
this.planPart = planPart;
}
@Override
public boolean isProjectable() {
return true;
}
@Override
public void setTarget(Column [] targets) {
this.targets = targets;
}
@Override
public boolean isSelectable() {
return true;
}
@Override
public void setFilter(EvalNode filter) {
this.filter = filter;
}
@Override
public void setLimit(long num) {
this.limit = num;
}
@Override
public boolean isSplittable() {
return false;
}
@Override
public float getProgress() {
return 0;
}
@Override
public TableStats getInputStats() {
return stats;
}
@Override
public Schema getSchema() {
return schema;
}
protected SQLBuilder getSQLBuilder() {
return new SQLBuilder(dbMetaData, getSQLExprBuilder());
}
protected SQLExpressionGenerator getSQLExprBuilder() {
return new SQLExpressionGenerator(dbMetaData);
}
protected void convertTuple(ResultSet resultSet, VTuple tuple) {
try {
for (int column_idx = 0; column_idx < targets.length; column_idx++) {
final Column c = targets[column_idx];
final int resultIdx = column_idx + 1;
switch (c.getDataType().getType()) {
case INT1:
case INT2:
tuple.put(column_idx, DatumFactory.createInt2(resultSet.getShort(resultIdx)));
break;
case INT4:
tuple.put(column_idx, DatumFactory.createInt4(resultSet.getInt(resultIdx)));
break;
case INT8:
tuple.put(column_idx, DatumFactory.createInt8(resultSet.getLong(resultIdx)));
break;
case FLOAT4:
tuple.put(column_idx, DatumFactory.createFloat4(resultSet.getFloat(resultIdx)));
break;
case FLOAT8:
tuple.put(column_idx, DatumFactory.createFloat8(resultSet.getDouble(resultIdx)));
break;
case CHAR:
tuple.put(column_idx, DatumFactory.createText(resultSet.getString(resultIdx)));
break;
case VARCHAR:
case TEXT:
// TODO - trim is unnecessary in many cases, so we can use it for certain cases
tuple.put(column_idx, DatumFactory.createText(resultSet.getString(resultIdx).trim()));
break;
case DATE:
final Date date = resultSet.getDate(resultIdx);
tuple.put(column_idx, DatumFactory.createDate(1900 + date.getYear(), 1 + date.getMonth(), date.getDate()));
break;
case TIME:
final Time time = resultSet.getTime(resultIdx);
tuple.put(column_idx, new TimeDatum(
DateTimeUtil.toTime(time.getHours(), time.getMinutes(), time.getSeconds(), 0)));
break;
case TIMESTAMP:
tuple.put(column_idx,
DatumFactory.createTimestampDatumWithJavaMillis(resultSet.getTimestamp(resultIdx).getTime()));
break;
case BINARY:
case VARBINARY:
case BLOB:
tuple.put(column_idx,
DatumFactory.createBlob(resultSet.getBytes(resultIdx)));
break;
default:
throw new TajoInternalError(new UnsupportedDataTypeException(c.getDataType().getType().name()));
}
}
} catch (SQLException s) {
throw new TajoInternalError(s);
}
}
private ResultSetIterator executeQueryAndGetIter() {
try {
LOG.info("Generated SQL: " + generatedSql);
Connection conn = DriverManager.getConnection(fragment.getUri().toASCIIString(), connProperties);
Statement statement = conn.createStatement();
ResultSet resultset = statement.executeQuery(generatedSql);
return new ResultSetIterator((resultset));
} catch (SQLException s) {
throw new TajoInternalError(s);
}
}
public class ResultSetIterator implements Iterator<Tuple>, Closeable {
private final ResultSet resultSet;
private boolean didNext = false;
private boolean hasNext = false;
public ResultSetIterator(ResultSet resultSet) {
this.resultSet = resultSet;
}
@Override
public boolean hasNext() {
if (!didNext) {
try {
hasNext = resultSet.next();
} catch (SQLException e) {
throw new RuntimeException(e);
}
didNext = true;
}
return hasNext;
}
@Override
public Tuple next() {
if (!didNext) {
try {
resultSet.next();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
didNext = false;
convertTuple(resultSet, outTuple);
recordCount++;
return outTuple;
}
@Override
public void remove() {
throw new TajoRuntimeException(new UnsupportedException());
}
public void rewind() {
try {
resultSet.isBeforeFirst();
} catch (SQLException e) {
throw new TajoInternalError(e);
}
}
@Override
public void close() throws IOException {
try {
resultSet.close();
} catch (SQLException e) {
LOG.warn(e);
}
if (stats != null) {
stats.setNumRows(recordCount);
}
}
}
}