blob: 661e83f3587bf123b9af3325c79267eb8c4dfd35 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.presto;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.datastore.block.BlockletInfos;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.executor.QueryExecutor;
import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.result.BatchResult;
import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordSet;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.Type;
import scala.Tuple3;
import static org.apache.carbondata.presto.Types.checkType;
//import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
public class CarbondataRecordSet implements RecordSet {
private CarbonTable carbonTable;
private TupleDomain<ColumnHandle> originalConstraint;
private Expression carbonConstraint;
private List<CarbondataColumnConstraint> rebuildConstraints;
private QueryModel queryModel;
private CarbondataSplit split;
private List<CarbondataColumnHandle> columns;
private QueryExecutor queryExecutor;
private CarbonDictionaryDecodeReaderSupport readSupport;
public CarbondataRecordSet(CarbonTable carbonTable, ConnectorSession session,
ConnectorSplit split, List<CarbondataColumnHandle> columns, QueryModel queryModel) {
this.carbonTable = carbonTable;
this.split = checkType(split, CarbondataSplit.class, "connectorSplit");
this.originalConstraint = this.split.getConstraints();
this.rebuildConstraints = this.split.getRebuildConstraints();
this.queryModel = queryModel;
this.columns = columns;
this.readSupport = new CarbonDictionaryDecodeReaderSupport();
}
//todo support later
private Expression parseConstraint2Expression(TupleDomain<ColumnHandle> constraints) {
return null;
}
@Override public List<Type> getColumnTypes() {
return columns.stream().map(a -> a.getColumnType()).collect(Collectors.toList());
}
/**
* get data blocks via Carbondata QueryModel API
*/
@Override public RecordCursor cursor() {
List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
tableBlockInfoList.add(new TableBlockInfo(split.getLocalInputSplit().getPath().toString(),
split.getLocalInputSplit().getStart(), split.getLocalInputSplit().getSegmentId(),
split.getLocalInputSplit().getLocations().toArray(new String[0]),
split.getLocalInputSplit().getLength(), new BlockletInfos(),
//blockletInfos,
ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion()), null));
queryModel.setTableBlockInfos(tableBlockInfoList);
queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
try {
Tuple3[] dict = readSupport
.initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
CarbonIterator<Object[]> carbonIterator =
new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
RecordCursor rc =
new CarbondataRecordCursor(readSupport, carbonIterator, columns, split, dict);
return rc;
} catch (QueryExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}
}