blob: 5fb0e81f6439043c89515513e38c437b43fc70a8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.phoenix;
import java.security.PrivilegedAction;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.phoenix.PhoenixReader.ArrayBigintDefn;
import org.apache.drill.exec.store.phoenix.PhoenixReader.ArrayBooleanDefn;
import org.apache.drill.exec.store.phoenix.PhoenixReader.ArrayDefn;
import org.apache.drill.exec.store.phoenix.PhoenixReader.ArrayDoubleDefn;
import org.apache.drill.exec.store.phoenix.PhoenixReader.ArrayIntegerDefn;
import org.apache.drill.exec.store.phoenix.PhoenixReader.ArraySmallintDefn;
import org.apache.drill.exec.store.phoenix.PhoenixReader.ArrayTinyintDefn;
import org.apache.drill.exec.store.phoenix.PhoenixReader.ArrayVarcharDefn;
import org.apache.drill.exec.store.phoenix.PhoenixReader.ColumnDefn;
import org.apache.drill.exec.store.phoenix.PhoenixReader.GenericDateDefn;
import org.apache.drill.exec.store.phoenix.PhoenixReader.GenericDefn;
import org.apache.drill.exec.store.phoenix.PhoenixReader.GenericTimeDefn;
import org.apache.drill.exec.store.phoenix.PhoenixReader.GenericTimestampDefn;
import org.apache.drill.exec.util.ImpersonationUtil;
import com.google.common.base.Stopwatch;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixBatchReader.class);
private final PhoenixSubScan subScan;
private final boolean impersonationEnabled;
private final UserGroupInformation ugi = ImpersonationUtil.getProcessUserUGI();
private CustomErrorContext errorContext;
private PhoenixReader reader;
private PreparedStatement pstmt;
private ResultSet results;
private ResultSetMetaData meta;
private ColumnDefn[] columns;
private Stopwatch watch;
public PhoenixBatchReader(PhoenixSubScan subScan) {
this.subScan = subScan;
this.impersonationEnabled = subScan.getPlugin().getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
}
@Override
public boolean open(SchemaNegotiator negotiator) {
return impersonationEnabled
? ugi.doAs((PrivilegedAction<Boolean>) () -> processOpen(negotiator))
: processOpen(negotiator);
}
private boolean processOpen(SchemaNegotiator negotiator) {
try {
errorContext = negotiator.parentErrorContext();
DataSource ds = subScan.getPlugin().getDataSource(negotiator.userName());
pstmt = ds.getConnection().prepareStatement(subScan.getSql());
results = pstmt.executeQuery();
meta = pstmt.getMetaData();
} catch (SQLException e) {
throw UserException
.dataReadError(e)
.message("Failed to execute the phoenix sql query. " + e.getMessage())
.addContext(errorContext)
.build(logger);
}
try {
negotiator.tableSchema(defineMetadata(), true);
reader = new PhoenixReader(negotiator.build(), columns, results);
bindColumns(reader.getStorage());
} catch (SQLException e) {
throw UserException
.dataReadError(e)
.message("Failed to get type of columns from metadata. " + e.getMessage())
.addContext(errorContext)
.build(logger);
}
watch = Stopwatch.createStarted();
return true;
}
@Override
public boolean next() {
return impersonationEnabled
? ugi.doAs((PrivilegedAction<Boolean>) this::processNext)
: processNext();
}
private boolean processNext() {
try {
while (!reader.getStorage().isFull()) {
if (!reader.processRow()) { // return true if one row is processed.
watch.stop();
logger.debug("Phoenix fetch total record numbers : {}", reader.getRowCount());
return false; // the EOF is reached.
}
}
return true; // batch full but not reached the EOF.
} catch (SQLException e) {
throw UserException
.dataReadError(e)
.message("Failed to get the data from the result set. " + e.getMessage())
.addContext(errorContext)
.build(logger);
}
}
@Override
public void close() {
logger.debug("Phoenix fetch batch size : {}, took {} ms. ", reader.getBatchCount(), watch.elapsed(TimeUnit.MILLISECONDS));
AutoCloseables.closeSilently(results, pstmt, reader);
}
private TupleMetadata defineMetadata() throws SQLException {
List<SchemaPath> cols = subScan.getColumns();
columns = new ColumnDefn[cols.size()];
SchemaBuilder builder = new SchemaBuilder();
for (int index = 0; index < cols.size(); index++) {
int columnIndex = index + 1; // column the first column is 1
int sqlType = meta.getColumnType(columnIndex);
String columnName = cols.get(index).rootName();
columns[index] = makeColumn(columnName, sqlType, meta.getColumnTypeName(columnIndex), columnIndex);
columns[index].define(builder);
}
return builder.buildSchema();
}
private ColumnDefn makeColumn(String name, int sqlType, String baseType, int index) {
if (sqlType == Types.ARRAY) { // supports the array data type
if (baseType.equals(ArrayDefn.VARCHAR) || baseType.equals(ArrayDefn.CHAR)) {
return new ArrayVarcharDefn(name, index, sqlType, baseType);
} else if (baseType.equals(ArrayDefn.BIGINT)) {
return new ArrayBigintDefn(name, index, sqlType, baseType);
} else if (baseType.equals(ArrayDefn.INTEGER)) {
return new ArrayIntegerDefn(name, index, sqlType, baseType);
} else if (baseType.equals(ArrayDefn.SMALLINT)) {
return new ArraySmallintDefn(name, index, sqlType, baseType);
} else if (baseType.equals(ArrayDefn.TINYINT)) {
return new ArrayTinyintDefn(name, index, sqlType, baseType);
} else if (baseType.equals(ArrayDefn.DOUBLE) || baseType.equals(ArrayDefn.FLOAT)) {
return new ArrayDoubleDefn(name, index, sqlType, baseType);
} else if (baseType.equals(ArrayDefn.BOOLEAN)) {
return new ArrayBooleanDefn(name, index, sqlType, baseType);
} else {
throw UserException.dataReadError()
.message("The Phoenix reader does not support this data type : " + baseType)
.addContext(errorContext)
.build(logger);
}
}
// supports the generic data type
if (sqlType == Types.DATE) {
return new GenericDateDefn(name, index, sqlType);
} else if (sqlType == Types.TIME) {
return new GenericTimeDefn(name, index, sqlType);
} else if (sqlType == Types.TIMESTAMP) {
return new GenericTimestampDefn(name, index, sqlType);
} else if (sqlType == Types.VARCHAR
|| sqlType == Types.CHAR
|| sqlType == Types.BIGINT
|| sqlType == Types.INTEGER
|| sqlType == Types.SMALLINT
|| sqlType == Types.TINYINT
|| sqlType == Types.DOUBLE
|| sqlType == Types.FLOAT
|| sqlType == Types.DECIMAL
|| sqlType == Types.BINARY
|| sqlType == Types.VARBINARY
|| sqlType == Types.BOOLEAN) {
return new GenericDefn(name, index, sqlType);
} else {
throw UserException.dataReadError()
.message("The Phoenix reader does not support this data type : java.sql.Types : " + sqlType)
.addContext(errorContext)
.build(logger);
}
}
private void bindColumns(RowSetLoader loader) {
for (int i = 0; i < columns.length; i++) {
columns[i].bind(loader);
}
}
}