blob: 1dfead3c25205c31d508645acb25a3126a096ed1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.hadoop;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.scan.executor.QueryExecutor;
import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* Reads the data from Carbon store.
*/
public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
protected QueryModel queryModel;
protected CarbonReadSupport<T> readSupport;
protected CarbonIterator<Object[]> carbonIterator;
protected QueryExecutor queryExecutor;
private InputMetricsStats inputMetricsStats;
/**
* Whether to clear datamap when reader is closed. In some scenarios such as datamap rebuild,
* we will set it to true and will clear the datamap after rebuild
*/
private boolean skipClearDataMapAtClose = false;
public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport,
InputMetricsStats inputMetricsStats, Configuration configuration) {
this(queryModel, readSupport, configuration);
this.inputMetricsStats = inputMetricsStats;
}
public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport,
Configuration configuration) {
this.queryModel = queryModel;
this.readSupport = readSupport;
this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel, configuration);
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
// The input split can contain single HDFS block or multiple blocks, so firstly get all the
// blocks and then set them in the query model.
List<CarbonInputSplit> splitList;
if (inputSplit instanceof CarbonInputSplit) {
splitList = new ArrayList<>(1);
String splitPath = ((CarbonInputSplit) inputSplit).getPath().toString();
// BlockFooterOffSet will be null in case of CarbonVectorizedReader as this has to be set
// where multiple threads are able to read small set of files to calculate footer instead
// of the main thread setting this for all the files.
if (((CarbonInputSplit) inputSplit).getDetailInfo().getBlockFooterOffset() == 0L) {
FileReader reader = FileFactory.getFileHolder(FileFactory.getFileType(splitPath),
context.getConfiguration());
ByteBuffer buffer = reader
.readByteBuffer(FileFactory.getUpdatedFilePath(splitPath), inputSplit.getLength() - 8,
8);
((CarbonInputSplit) inputSplit).getDetailInfo().setBlockFooterOffset(buffer.getLong());
}
splitList.add((CarbonInputSplit) inputSplit);
} else if (inputSplit instanceof CarbonMultiBlockSplit) {
// contains multiple blocks, this is an optimization for concurrent query.
CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) inputSplit;
splitList = multiBlockSplit.getAllSplits();
} else {
throw new RuntimeException("unsupported input split type: " + inputSplit);
}
// It should use the exists tableBlockInfos if tableBlockInfos of queryModel is not empty
// otherwise the prune is no use before this method
if (!queryModel.isFG()) {
List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
queryModel.setTableBlockInfos(tableBlockInfoList);
}
readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
try {
carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel));
} catch (QueryExecutionException e) {
throw new InterruptedException(e.getMessage());
}
}
@Override public boolean nextKeyValue() {
return carbonIterator.hasNext();
}
@Override public Void getCurrentKey() throws IOException, InterruptedException {
return null;
}
@Override public T getCurrentValue() throws IOException, InterruptedException {
rowCount += 1;
if (null != inputMetricsStats) {
inputMetricsStats.incrementRecordRead(1L);
}
return readSupport.readRow(carbonIterator.next());
}
/**
* get batch result
*
* @return rows
*/
public List<Object[]> getBatchValue() {
if (null != inputMetricsStats) {
inputMetricsStats.incrementRecordRead(1L);
}
List<Object[]> objects = ((ChunkRowIterator) carbonIterator).nextBatch();
rowCount += objects.size();
return objects;
}
@Override public float getProgress() throws IOException, InterruptedException {
// TODO : Implement it based on total number of rows it is going to retrieve.
return 0;
}
@Override public void close() throws IOException {
logStatistics(rowCount, queryModel.getStatisticsRecorder());
// clear dictionary cache
Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
if (null != columnToDictionaryMapping) {
for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
CarbonUtil.clearDictionaryCache(entry.getValue());
}
}
if (!skipClearDataMapAtClose) {
// Clear the datamap cache
DataMapStoreManager.getInstance().clearDataMaps(
queryModel.getTable().getAbsoluteTableIdentifier());
}
// close read support
readSupport.close();
carbonIterator.close();
try {
queryExecutor.finish();
} catch (QueryExecutionException e) {
throw new IOException(e);
}
}
public void setSkipClearDataMapAtClose(boolean skipClearDataMapAtClose) {
this.skipClearDataMapAtClose = skipClearDataMapAtClose;
}
}