blob: b6a518c527034bbce2c58108c32d238bef4a2d68 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Timestamp;
import java.util.List;
import java.util.UUID;
import org.apache.impala.extdatasource.thrift.TBinaryPredicate;
import org.apache.impala.extdatasource.thrift.TCloseParams;
import org.apache.impala.extdatasource.thrift.TCloseResult;
import org.apache.impala.extdatasource.thrift.TColumnDesc;
import org.apache.impala.extdatasource.thrift.TGetNextParams;
import org.apache.impala.extdatasource.thrift.TGetNextResult;
import org.apache.impala.extdatasource.thrift.TOpenParams;
import org.apache.impala.extdatasource.thrift.TOpenResult;
import org.apache.impala.extdatasource.thrift.TPrepareParams;
import org.apache.impala.extdatasource.thrift.TPrepareResult;
import org.apache.impala.extdatasource.thrift.TRowBatch;
import org.apache.impala.extdatasource.thrift.TTableSchema;
import org.apache.impala.extdatasource.util.SerializationUtils;
import org.apache.impala.extdatasource.v1.ExternalDataSource;
import org.apache.impala.thrift.TColumnData;
import org.apache.impala.thrift.TColumnType;
import org.apache.impala.thrift.TPrimitiveType;
import org.apache.impala.thrift.TScalarType;
import org.apache.impala.thrift.TStatus;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TTypeNodeType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
/**
* Data source implementation for tests that can:
* (a) Accepts every other offered conjunct for testing planning (though predicates are
* not actually evaluated) and returns trivial data of all supported types for
* query tests.
* (b) Validate the predicates offered by Impala.
*/
public class AllTypesDataSource implements ExternalDataSource {
// Total number of rows to return
private static final int NUM_ROWS_RETURNED = 5000;
// Change the size of the batches that are returned
private static final int INITIAL_BATCH_SIZE = 500;
private static final int BATCH_SIZE_INCREMENT = 100;
private static final TStatus STATUS_OK =
new TStatus(TErrorCode.OK, Lists.<String>newArrayList());
private int currRow_;
private boolean eos_;
private int batchSize_;
private TTableSchema schema_;
private DataSourceState state_;
private String scanHandle_;
private String validatePredicatesResult_;
// Enumerates the states of the data source.
private enum DataSourceState {
CREATED,
OPENED,
CLOSED
}
public AllTypesDataSource() {
eos_ = false;
currRow_ = 0;
state_ = DataSourceState.CREATED;
}
/**
* Accepts every other conjunct and returns the constant number of rows that
* is always returned.
*/
@Override
public TPrepareResult prepare(TPrepareParams params) {
Preconditions.checkState(state_ == DataSourceState.CREATED);
List<Integer> accepted = Lists.newArrayList();
int numRowsReturned = 0;
if (validatePredicates(params.getPredicates())) {
// Indicate all predicates are applied because we return a dummy row with the
// result later to validate the result in tests. Impala shouldn't try to apply
// predicates to that dummy row.
for (int i = 0; i < params.getPredicatesSize(); ++i) accepted.add(i);
numRowsReturned = 1;
} else {
// Default behavior is to accept every other predicate. They are not actually
// applied, but we want to validate that Impala applies the correct predicates.
for (int i = 0; i < params.getPredicatesSize(); ++i) {
if (i % 2 == 0) accepted.add(i);
}
numRowsReturned = NUM_ROWS_RETURNED;
}
return new TPrepareResult(STATUS_OK)
.setAccepted_conjuncts(accepted)
.setNum_rows_estimate(numRowsReturned);
}
/**
* If the predicate value (assuming STRING) starts with 'VALIDATE_PREDICATES##',
* we validate the TPrepareParams.predicates against predicates specified after the
* 'VALIDATE_PREDICATES##' and return true. The result of the validation is stored
* in validatePredicatesResult_.
*
* The expected predicates are specified in the form "{slot} {TComparisonOp} {val}",
* and conjunctive predicates are separated by '&&'.
*
* For example, the predicates_spec validates the predicates in the following query:
* select * from table_name
* where predicates_spec = 'x LT 1 && y GT 2' and
* x < 1 and
* 2 > y;
*
* Current limitations:
* - Disjunctive predicates are not supported (e.g. "expr1 or expr2")
* - Only INT is supported
*/
private boolean validatePredicates(List<List<TBinaryPredicate>> predicates) {
if (predicates == null || predicates.isEmpty()) return false;
TBinaryPredicate firstPredicate = predicates.get(0).get(0);
if (!firstPredicate.getValue().isSetString_val()) return false;
String colVal = firstPredicate.getValue().getString_val();
if (!colVal.toUpperCase().startsWith("VALIDATE_PREDICATES##")) return false;
String[] colValParts = colVal.split("##");
Preconditions.checkArgument(colValParts.length == 2);
String[] expectedPredicates = colValParts[1].split("&&");
Preconditions.checkArgument(expectedPredicates.length == predicates.size() - 1);
String result = "SUCCESS";
for (int i = 1; i < predicates.size(); ++i) {
String[] predicateParts = expectedPredicates[i - 1].trim().split(" ");
Preconditions.checkArgument(predicateParts.length == 3);
TBinaryPredicate predicate =
Iterables.getOnlyElement(predicates.get(i));
Preconditions.checkArgument(predicate.getValue().isSetInt_val());
String slotName = predicate.getCol().getName().toUpperCase();
int intVal = predicate.getValue().getInt_val();
if (!predicateParts[0].toUpperCase().equals(slotName) ||
!predicateParts[1].toUpperCase().equals(predicate.getOp().name()) ||
!predicateParts[2].equals(Integer.toString(intVal))) {
result = "Failed predicate, expected=" + expectedPredicates[i - 1].trim() +
" actual=" + predicate.toString();
}
}
validatePredicatesResult_ = result;
return true;
}
/**
* Initializes the batch size and stores the table schema.
*/
@Override
public TOpenResult open(TOpenParams params) {
Preconditions.checkState(state_ == DataSourceState.CREATED);
state_ = DataSourceState.OPENED;
batchSize_ = INITIAL_BATCH_SIZE;
schema_ = params.getRow_schema();
// Need to check validatePredicates again because the call in Prepare() was from
// the frontend and used a different instance of this data source class.
if (validatePredicates(params.getPredicates())) {
// If validating predicates, only one STRING column should be selected.
Preconditions.checkArgument(schema_.getColsSize() == 1);
TColumnDesc firstCol = schema_.getCols().get(0);
TColumnType firstType = firstCol.getType();
Preconditions.checkState(firstType.getTypesSize() == 1);
Preconditions.checkState(firstType.types.get(0).getType() == TTypeNodeType.SCALAR);
Preconditions.checkArgument(
firstType.types.get(0).scalar_type.getType() == TPrimitiveType.STRING);
}
scanHandle_ = UUID.randomUUID().toString();
return new TOpenResult(STATUS_OK).setScan_handle(scanHandle_);
}
/**
* If validating predicates, returns a single row with the result of the validation.
* Otherwise returns row batches with generated rows based on the row index. Called
* multiple times, so the current row is stored between calls. Each row batch is a
* different size (not necessarily the size specified by TOpenParams.batch_size to
* ensure that Impala can handle unexpected batch sizes.
*/
@Override
public TGetNextResult getNext(TGetNextParams params) {
Preconditions.checkState(state_ == DataSourceState.OPENED);
Preconditions.checkArgument(params.getScan_handle().equals(scanHandle_));
if (eos_) return new TGetNextResult(STATUS_OK).setEos(eos_);
if (validatePredicatesResult_ != null) {
TColumnData colData = new TColumnData();
colData.setIs_null(Lists.newArrayList(false));
colData.setString_vals(Lists.newArrayList(validatePredicatesResult_));
eos_ = true;
return new TGetNextResult(STATUS_OK).setEos(eos_)
.setRows(new TRowBatch().setCols(Lists.newArrayList(colData)).setNum_rows(1));
}
List<TColumnData> cols = Lists.newArrayList();
for (int i = 0; i < schema_.getColsSize(); ++i) {
cols.add(new TColumnData().setIs_null(Lists.<Boolean>newArrayList()));
}
int numAdded = 0;
while (currRow_ < NUM_ROWS_RETURNED && numAdded < batchSize_) {
addRow(cols);
++numAdded;
++currRow_;
}
batchSize_ += BATCH_SIZE_INCREMENT;
if (currRow_ == NUM_ROWS_RETURNED) eos_ = true;
return new TGetNextResult(STATUS_OK).setEos(eos_)
.setRows(new TRowBatch().setCols(cols).setNum_rows(numAdded));
}
/**
* Adds a row to the set of columns. For all numeric types the value is set to the
* row index (mod the size for integer types). For strings it is just a string
* containing the row index and every 5th result is null.
*/
private void addRow(List<TColumnData> cols) {
for (int i = 0; i < cols.size(); ++i) {
TColumnDesc colDesc = schema_.getCols().get(i);
TColumnData colData = cols.get(i);
TColumnType type = colDesc.getType();
if (type.types.get(0).getType() != TTypeNodeType.SCALAR) {
// Unsupported non-scalar type.
throw new UnsupportedOperationException("Unsupported column type: " +
type.types.get(0).getType());
}
Preconditions.checkState(type.getTypesSize() == 1);
TScalarType scalarType = type.types.get(0).scalar_type;
switch (scalarType.type) {
case TINYINT:
colData.addToIs_null(false);
colData.addToByte_vals((byte) (currRow_ % 10));
break;
case SMALLINT:
colData.addToIs_null(false);
colData.addToShort_vals((short) (currRow_ % 100));
break;
case INT:
case DATE:
colData.addToIs_null(false);
colData.addToInt_vals(currRow_);
break;
case BIGINT:
colData.addToIs_null(false);
colData.addToLong_vals((long) currRow_ * 10);
break;
case DOUBLE:
colData.addToIs_null(false);
colData.addToDouble_vals(currRow_);
break;
case FLOAT:
colData.addToIs_null(false);
colData.addToDouble_vals((float) (1.1 * currRow_));
break;
case STRING:
if (currRow_ % 5 == 0) {
colData.addToIs_null(true);
} else {
colData.addToIs_null(false);
colData.addToString_vals(String.valueOf(currRow_));
}
break;
case BOOLEAN:
colData.addToIs_null(false);
colData.addToBool_vals(currRow_ % 2 == 0);
break;
case TIMESTAMP:
colData.addToIs_null(false);
colData.addToBinary_vals(
SerializationUtils.encodeTimestamp(new Timestamp(currRow_)));
break;
case DECIMAL:
colData.addToIs_null(false);
BigInteger maxUnscaled = BigInteger.TEN.pow(scalarType.getPrecision());
BigInteger val = maxUnscaled.subtract(BigInteger.valueOf(currRow_ + 1));
val = val.mod(maxUnscaled);
if (currRow_ % 2 == 0) val = val.negate();
colData.addToBinary_vals(SerializationUtils.encodeDecimal(new BigDecimal(val)));
break;
case BINARY:
case CHAR:
case DATETIME:
case INVALID_TYPE:
case NULL_TYPE:
default:
// Unsupported.
throw new UnsupportedOperationException("Unsupported column type: " +
scalarType.getType());
}
}
}
@Override
public TCloseResult close(TCloseParams params) {
Preconditions.checkState(state_ == DataSourceState.OPENED);
Preconditions.checkArgument(params.getScan_handle().equals(scanHandle_));
state_ = DataSourceState.CLOSED;
return new TCloseResult(STATUS_OK);
}
}