blob: d03a2529adc855935c6109fd79533d1d3f6f8ad1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.hadoop.util;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.scan.executor.QueryExecutor;
import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
import org.apache.carbondata.core.scan.model.ProjectionMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.hadoop.AbstractRecordReader;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
/**
* A specialized RecordReader that reads into CarbonColumnarBatches directly using the
* carbondata column APIs and fills the data directly into columns.
*/
public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
private static final Logger LOGGER =
LogServiceFactory.getLogService(CarbonVectorizedRecordReader.class.getName());
private CarbonColumnarBatch carbonColumnarBatch;
private QueryExecutor queryExecutor;
private int batchIdx = 0;
private int numBatched = 0;
private AbstractDetailQueryResultIterator iterator;
private QueryModel queryModel;
//This holds mapping of fetch index with respect to project col index.
// it is used when same col is used in projection many times.So need to fetch only that col.
private List<Integer> projectionMapping = new ArrayList<>();
public CarbonVectorizedRecordReader(QueryModel queryModel) {
this.queryModel = queryModel;
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
List<CarbonInputSplit> splitList;
if (inputSplit instanceof CarbonInputSplit) {
// Read the footer offset and set.
CarbonInputSplit carbonInputSplit = ((CarbonInputSplit) inputSplit);
String splitPath = carbonInputSplit.getFilePath();
if ((null != carbonInputSplit.getDetailInfo()
&& carbonInputSplit.getDetailInfo().getBlockFooterOffset() == 0L) || (
null == carbonInputSplit.getDetailInfo() && carbonInputSplit.getStart() == 0)) {
FileReader reader = FileFactory.getFileHolder(FileFactory.getFileType(splitPath),
taskAttemptContext.getConfiguration());
ByteBuffer buffer = reader
.readByteBuffer(FileFactory.getUpdatedFilePath(splitPath),
((CarbonInputSplit) inputSplit).getLength() - 8,
8);
if (carbonInputSplit.getDetailInfo() == null) {
carbonInputSplit.setStart(buffer.getLong());
} else {
carbonInputSplit.getDetailInfo().setBlockFooterOffset(buffer.getLong());
}
reader.finish();
}
splitList = new ArrayList<>(1);
splitList.add((CarbonInputSplit) inputSplit);
} else {
throw new RuntimeException("unsupported input split type: " + inputSplit);
}
List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
queryModel.setTableBlockInfos(tableBlockInfoList);
queryModel.setVectorReader(true);
try {
queryExecutor =
QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration());
iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
initBatch();
} catch (QueryExecutionException e) {
LOGGER.error(e);
throw new InterruptedException(e.getMessage());
} catch (Exception e) {
LOGGER.error(e);
throw e;
}
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (batchIdx >= numBatched) {
if (!nextBatch()) return false;
}
++batchIdx;
return true;
}
private boolean nextBatch() {
carbonColumnarBatch.reset();
if (iterator.hasNext()) {
iterator.processNextBatch(carbonColumnarBatch);
numBatched = carbonColumnarBatch.getActualSize();
batchIdx = 0;
return true;
}
return false;
}
private void initBatch() {
if (carbonColumnarBatch == null) {
List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions();
List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
for (ProjectionDimension dim : queryDimension) {
fields[dim.getOrdinal()] =
new StructField(dim.getColumnName(), dim.getDimension().getDataType());
}
for (ProjectionMeasure msr : queryMeasures) {
DataType dataType = msr.getMeasure().getDataType();
if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT
|| dataType == DataTypes.INT || dataType == DataTypes.LONG
|| dataType == DataTypes.FLOAT || dataType == DataTypes.BYTE
|| dataType == DataTypes.BINARY) {
fields[msr.getOrdinal()] =
new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
} else if (DataTypes.isDecimal(dataType)) {
fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
DataTypes.createDecimalType(msr.getMeasure().getPrecision(),
msr.getMeasure().getScale()));
} else {
fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE);
}
}
CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
Map<String, Integer> colmap = new HashMap<>();
for (int i = 0; i < fields.length; i++) {
vectors[i] = new CarbonColumnVectorImpl(
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
fields[i].getDataType());
if (colmap.containsKey(fields[i].getFieldName())) {
int reusedIndex = colmap.get(fields[i].getFieldName());
projectionMapping.add(reusedIndex);
} else {
colmap.put(fields[i].getFieldName(), i);
projectionMapping.add(i);
}
}
carbonColumnarBatch = new CarbonColumnarBatch(vectors,
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
new boolean[] {});
}
}
// if same col is given in projection many time then below logic is used to scan only once
// Ex. project cols=C1,C2,C3,C2 , projectionMapping holds[0,1,2,1]
// Row will be formed based on projectionMapping.
@Override
public Object getCurrentValue() throws IOException, InterruptedException {
rowCount += 1;
Object[] row = new Object[projectionMapping.size()];
for (int i = 0; i < projectionMapping.size(); i++) {
// if projectionMapping.get(i) <i it means row is fetched already
if (projectionMapping.get(i) < i) {
row[i] = row[projectionMapping.get(i)];
} else {
Object data = carbonColumnarBatch.columnVectors[projectionMapping.get(i)]
.getData(batchIdx - 1);
if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.STRING
|| carbonColumnarBatch.columnVectors[i].getType() == DataTypes.VARCHAR) {
if (data == null) {
row[i] = null;
} else {
row[i] = ByteUtil.toString((byte[]) data, 0, (((byte[]) data).length));
}
} else if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.BOOLEAN) {
if (data == null) {
row[i] = null;
} else {
row[i] = ByteUtil.toBoolean((byte) data);
}
} else {
row[i] = carbonColumnarBatch.columnVectors[projectionMapping.get(i)]
.getData(batchIdx - 1);
}
}
}
return row;
}
@Override
public Void getCurrentKey() throws IOException, InterruptedException {
throw new UnsupportedOperationException("Operation not allowed on CarbonVectorizedReader");
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO : Implement it based on total number of rows it is going to retrieve.
return 0;
}
@Override
public void close() throws IOException {
logStatistics(rowCount, queryModel.getStatisticsRecorder());
if (carbonColumnarBatch != null) {
carbonColumnarBatch = null;
}
if (iterator != null) {
iterator.close();
}
try {
queryExecutor.finish();
} catch (QueryExecutionException e) {
throw new IOException(e);
}
}
}