blob: 33851706c528b09d6a94840b74b0a54ed2a5df5c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.presto;
import org.apache.carbondata.core.util.DataTypeConverterImpl;
import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
import org.apache.carbondata.presto.impl.CarbonTableReader;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.RecordSet;
import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.predicate.Range;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.*;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
import org.apache.carbondata.core.scan.expression.conditional.*;
import org.apache.carbondata.core.scan.expression.logical.AndExpression;
import org.apache.carbondata.core.scan.expression.logical.OrExpression;
import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.carbondata.presto.Types.checkType;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
private final String connectorId;
private final CarbonTableReader carbonTableReader;
@Inject
public CarbondataRecordSetProvider(CarbondataConnectorId connectorId, CarbonTableReader reader) {
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
this.carbonTableReader = reader;
}
@Override public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle,
ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) {
requireNonNull(split, "split is null");
requireNonNull(columns, "columns is null");
CarbondataSplit carbondataSplit =
checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
checkArgument(carbondataSplit.getConnectorId().equals(connectorId), "split is not for this connector");
StringBuffer targetColsBuffer = new StringBuffer();
String targetCols = "";
// Convert all columns handles
ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
for (ColumnHandle handle : columns) {
handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
targetColsBuffer.append(((CarbondataColumnHandle) handle).getColumnName()).append(",");
}
// Build column projection(check the column order)
if (targetColsBuffer.length() > 0) {
targetCols = targetColsBuffer.substring(0, targetCols.length() - 1);
}
else
{
targetCols = null;
}
//String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
CarbonTableCacheModel tableCacheModel =
carbonTableReader.getCarbonCache(carbondataSplit.getSchemaTableName());
checkNotNull(tableCacheModel, "tableCacheModel should not be null");
checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
// Build Query Model
CarbonTable targetTable = tableCacheModel.carbonTable;
CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(targetTable, targetCols);
QueryModel queryModel =
QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(),
queryPlan, targetTable, new DataTypeConverterImpl());
// Push down filter
fillFilter2QueryModel(queryModel, carbondataSplit.getConstraints(), targetTable);
// Return new record set
return new CarbondataRecordSet(targetTable, session, carbondataSplit,
handles.build(), queryModel);
}
// Build filter for QueryModel
private void fillFilter2QueryModel(QueryModel queryModel,
TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) {
//queryModel.setFilterExpressionResolverTree(new FilterResolverIntf());
//Build Predicate Expression
ImmutableList.Builder<Expression> filters = ImmutableList.builder();
Domain domain = null;
for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
// Build ColumnExpresstion for Expresstion(Carbondata)
CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
Type type = cdch.getColumnType();
DataType coltype = spi2CarbondataTypeMapper(cdch);
Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
domain = originalConstraint.getDomains().get().get(c);
checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
if (domain.getValues().isNone()) {
}
if (domain.getValues().isAll()) {
}
List<Object> singleValues = new ArrayList<>();
List<Expression> disjuncts = new ArrayList<>();
for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
if (range.isSingleValue()) {
singleValues.add(range.getLow().getValue());
} else {
List<Expression> rangeConjuncts = new ArrayList<>();
if (!range.getLow().isLowerUnbounded()) {
Object value = convertDataByType(range.getLow().getValue(), type);
switch (range.getLow().getBound()) {
case ABOVE:
if (type == TimestampType.TIMESTAMP) {
//todo not now
} else {
GreaterThanExpression greater = new GreaterThanExpression(colExpression,
new LiteralExpression(value, coltype));
rangeConjuncts.add(greater);
}
break;
case EXACTLY:
GreaterThanEqualToExpression greater =
new GreaterThanEqualToExpression(colExpression,
new LiteralExpression(value, coltype));
rangeConjuncts.add(greater);
break;
case BELOW:
throw new IllegalArgumentException("Low marker should never use BELOW bound");
default:
throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
}
}
if (!range.getHigh().isUpperUnbounded()) {
Object value = convertDataByType(range.getHigh().getValue(), type);
switch (range.getHigh().getBound()) {
case ABOVE:
throw new IllegalArgumentException("High marker should never use ABOVE bound");
case EXACTLY:
LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
new LiteralExpression(value, coltype));
rangeConjuncts.add(less);
break;
case BELOW:
LessThanExpression less2 =
new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
rangeConjuncts.add(less2);
break;
default:
throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
}
}
disjuncts.addAll(rangeConjuncts);
}
}
if (singleValues.size() == 1) {
Expression ex = null;
if (coltype.equals(DataType.STRING)) {
ex = new EqualToExpression(colExpression,
new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
} else if (coltype.equals(DataType.TIMESTAMP) || coltype.equals(DataType.DATE)) {
Long value = (Long) singleValues.get(0) * 1000;
ex = new EqualToExpression(colExpression,
new LiteralExpression(value , coltype));
} else ex = new EqualToExpression(colExpression,
new LiteralExpression(singleValues.get(0), coltype));
filters.add(ex);
} else if (singleValues.size() > 1) {
ListExpression candidates = null;
List<Expression> exs = singleValues.stream().map((a) -> {
return new LiteralExpression(convertDataByType(a, type), coltype);
}).collect(Collectors.toList());
candidates = new ListExpression(exs);
if (candidates != null) filters.add(new InExpression(colExpression, candidates));
} else if (disjuncts.size() > 0) {
if (disjuncts.size() > 1) {
Expression finalFilters = new OrExpression(disjuncts.get(0), disjuncts.get(1));
if (disjuncts.size() > 2) {
for (int i = 2; i < disjuncts.size(); i++) {
filters.add(new AndExpression(finalFilters, disjuncts.get(i)));
}
}
} else if (disjuncts.size() == 1) filters.add(disjuncts.get(0));
}
}
Expression finalFilters;
List<Expression> tmp = filters.build();
if (tmp.size() > 1) {
finalFilters = new OrExpression(tmp.get(0), tmp.get(1));
if (tmp.size() > 2) {
for (int i = 2; i < tmp.size(); i++) {
finalFilters = new OrExpression(finalFilters, tmp.get(i));
}
}
} else if (tmp.size() == 1) finalFilters = tmp.get(0);
else return;
// todo set into QueryModel
CarbonInputFormatUtil.processFilterExpression(finalFilters, carbonTable);
queryModel.setFilterExpressionResolverTree(
CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
}
public static DataType spi2CarbondataTypeMapper(CarbondataColumnHandle carbondataColumnHandle) {
Type colType = carbondataColumnHandle.getColumnType();
if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
else if (colType == IntegerType.INTEGER) return DataType.INT;
else if (colType == BigintType.BIGINT) return DataType.LONG;
else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
else if (colType == VarcharType.VARCHAR) return DataType.STRING;
else if (colType == DateType.DATE) return DataType.DATE;
else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
else if (colType == DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(),
carbondataColumnHandle.getScale())) return DataType.DECIMAL;
else return DataType.STRING;
}
public Object convertDataByType(Object rawdata, Type type) {
if (type.equals(IntegerType.INTEGER)) return Integer.valueOf(rawdata.toString());
else if (type.equals(BigintType.BIGINT)) return (Long) rawdata;
else if (type.equals(VarcharType.VARCHAR)) return ((Slice) rawdata).toStringUtf8();
else if (type.equals(BooleanType.BOOLEAN)) return (Boolean) (rawdata);
return rawdata;
}
}