blob: 042db072d867cf7708084966514bdef8afd1481e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.stream;
import java.io.IOException;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.format.BlockletHeader;
import org.apache.carbondata.hadoop.InputMetricsStats;
import org.apache.carbondata.hadoop.stream.StreamRecordReader;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.CarbonVectorProxy;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
* Stream vector/row record reader
*/
public class CarbonStreamRecordReader extends StreamRecordReader {
// vector reader
protected boolean isVectorReader;
private CarbonVectorProxy vectorProxy;
private StructType outputSchema;
private InternalRow outputRow;
// InputMetricsStats
private InputMetricsStats inputMetricsStats;
public CarbonStreamRecordReader(boolean isVectorReader, InputMetricsStats inputMetricsStats,
QueryModel mdl, boolean useRawRow) {
super(mdl, useRawRow);
this.isVectorReader = isVectorReader;
this.inputMetricsStats = inputMetricsStats;
}
protected void initializeAtFirstRow() throws IOException {
super.initializeAtFirstRow();
outputRow = new GenericInternalRow(outputValues);
outputSchema = new StructType((StructField[])
DataTypeUtil.getDataTypeConverter().convertCarbonSchemaToSparkSchema(projection));
}
@Override
public boolean nextKeyValue() throws IOException {
if (isFirstRow) {
isFirstRow = false;
initializeAtFirstRow();
}
if (isFinished) {
return false;
}
if (isVectorReader) {
return nextColumnarBatch();
}
return nextRow();
}
@Override
public Object getCurrentValue() {
if (isVectorReader) {
int value = vectorProxy.numRows();
if (inputMetricsStats != null) {
inputMetricsStats.incrementRecordRead((long) value);
}
return vectorProxy.getColumnarBatch();
}
if (inputMetricsStats != null) {
inputMetricsStats.incrementRecordRead(1L);
}
return outputRow;
}
/**
* for vector reader, check next columnar batch
*/
private boolean nextColumnarBatch() throws IOException {
boolean hasNext;
boolean scanMore = false;
do {
// move to the next blocklet
hasNext = input.nextBlocklet();
if (hasNext) {
// read blocklet header
BlockletHeader header = input.readBlockletHeader();
if (isScanRequired(header)) {
scanMore = !scanBlockletAndFillVector(header);
} else {
input.skipBlockletData(true);
scanMore = true;
}
} else {
isFinished = true;
scanMore = false;
}
} while (scanMore);
return hasNext;
}
private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
// if filter is null and output projection is empty, use the row number of blocklet header
if (skipScanData) {
int rowNums = header.getBlocklet_info().getNum_rows();
vectorProxy = new CarbonVectorProxy(MemoryMode.OFF_HEAP, outputSchema, rowNums, false);
vectorProxy.setNumRows(rowNums);
input.skipBlockletData(true);
return rowNums > 0;
}
input.readBlockletData(header);
vectorProxy =
new CarbonVectorProxy(MemoryMode.OFF_HEAP, outputSchema, input.getRowNums(), false);
int rowNum = 0;
if (null == filter) {
while (input.hasNext()) {
readRowFromStream();
putRowToColumnBatch(rowNum++);
}
} else {
try {
while (input.hasNext()) {
readRowFromStream();
if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
putRowToColumnBatch(rowNum++);
}
}
} catch (FilterUnsupportedException e) {
throw new IOException("Failed to filter row in vector reader", e);
}
}
vectorProxy.setNumRows(rowNum);
return rowNum > 0;
}
private void putRowToColumnBatch(int rowId) {
for (int i = 0; i < projection.length; i++) {
Object value = outputValues[i];
vectorProxy.getColumnVector(i).putRowToColumnBatch(rowId,value);
}
}
@Override
public void close() throws IOException {
super.close();
if (null != vectorProxy) {
vectorProxy.close();
}
}
}