blob: 10aabf262d920110a841186081f042e12cade6c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.hadoop.api;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapChooser;
import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datamap.DataMapJob;
import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datamap.DataMapUtil;
import org.apache.carbondata.core.datamap.DistributableDataMapFormat;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.datamap.dev.expr.DataMapWrapperSimpleInfo;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.SchemaReader;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.profiler.ExplainCollector;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.model.QueryModelBuilder;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
import org.apache.carbondata.core.util.BlockletDataMapUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeConverter;
import org.apache.carbondata.core.util.DataTypeConverterImpl;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
import org.apache.carbondata.hadoop.CarbonProjection;
import org.apache.carbondata.hadoop.CarbonRecordReader;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.log4j.Logger;
/**
* Base class for carbondata input format, there are two input format implementations:
* 1. CarbonFileInputFormat: for reading carbondata files without table level metadata support.
*
* 2. CarbonTableInputFormat: for reading carbondata files with table level metadata support,
* such as segment and explicit schema metadata.
*
* @param <T>
*/
public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
// comma separated list of input segment numbers
public static final String INPUT_SEGMENT_NUMBERS =
"mapreduce.input.carboninputformat.segmentnumbers";
// comma separated list of input files
private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
private static final Logger LOG =
LogServiceFactory.getLogService(CarbonInputFormat.class.getName());
private static final String FILTER_PREDICATE =
"mapreduce.input.carboninputformat.filter.predicate";
private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
private static final String CARBON_TRANSACTIONAL_TABLE =
"mapreduce.input.carboninputformat.transactional";
private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
private static final String PARTITIONS_TO_PRUNE =
"mapreduce.input.carboninputformat.partitions.to.prune";
private static final String FGDATAMAP_PRUNING = "mapreduce.input.carboninputformat.fgdatamap";
private static final String READ_COMMITTED_SCOPE =
"mapreduce.input.carboninputformat.read.committed.scope";
// record segment number and hit blocks
protected int numSegments = 0;
protected int numStreamSegments = 0;
protected int numStreamFiles = 0;
protected int hitedStreamFiles = 0;
protected int numBlocks = 0;
protected List fileLists = null;
private CarbonTable carbonTable;
public int getNumSegments() {
return numSegments;
}
public int getNumStreamSegments() {
return numStreamSegments;
}
public int getNumStreamFiles() {
return numStreamFiles;
}
public int getHitedStreamFiles() {
return hitedStreamFiles;
}
public int getNumBlocks() {
return numBlocks;
}
public void setFileLists(List fileLists) {
this.fileLists = fileLists;
}
/**
* Set the `tableInfo` in `configuration`
*/
public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
throws IOException {
if (null != tableInfo) {
configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize()));
}
}
/**
* Get TableInfo object from `configuration`
*/
protected static TableInfo getTableInfo(Configuration configuration) throws IOException {
String tableInfoStr = configuration.get(TABLE_INFO);
if (tableInfoStr == null) {
return null;
} else {
TableInfo output = new TableInfo();
output.readFields(new DataInputStream(
new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr))));
return output;
}
}
/**
* Get the cached CarbonTable or create it by TableInfo in `configuration`
*/
public CarbonTable getOrCreateCarbonTable(Configuration configuration)
throws IOException {
if (carbonTable == null) {
// carbon table should be created either from deserialized table info (schema saved in
// hive metastore) or by reading schema in HDFS (schema saved in HDFS)
TableInfo tableInfo = getTableInfo(configuration);
CarbonTable carbonTable;
if (tableInfo != null) {
carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
} else {
carbonTable = SchemaReader.readCarbonTableFromStore(
getAbsoluteTableIdentifier(configuration));
}
this.carbonTable = carbonTable;
return carbonTable;
} else {
return this.carbonTable;
}
}
public static void setTablePath(Configuration configuration, String tablePath) {
configuration.set(FileInputFormat.INPUT_DIR, tablePath);
}
public static void setTransactionalTable(Configuration configuration,
boolean isTransactionalTable) {
configuration.set(CARBON_TRANSACTIONAL_TABLE, String.valueOf(isTransactionalTable));
}
public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
}
/**
* It sets unresolved filter expression.
*
* @param configuration
* @para DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
m filterExpression
*/
public static void setFilterPredicates(Configuration configuration,
DataMapFilter filterExpression) {
if (filterExpression == null || filterExpression.getExpression() == null) {
return;
}
try {
String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
configuration.set(FILTER_PREDICATE, filterString);
} catch (Exception e) {
throw new RuntimeException("Error while setting filter expression to Job", e);
}
}
/**
* Set the column projection column names
*
* @param configuration Configuration info
* @param projectionColumns projection columns name
*/
public static void setColumnProjection(Configuration configuration, String[] projectionColumns) {
Objects.requireNonNull(projectionColumns);
if (projectionColumns.length < 1) {
throw new RuntimeException("Projection can't be empty");
}
StringBuilder builder = new StringBuilder();
for (String column : projectionColumns) {
builder.append(column).append(",");
}
String columnString = builder.toString();
columnString = columnString.substring(0, columnString.length() - 1);
configuration.set(COLUMN_PROJECTION, columnString);
}
/**
* Set the column projection column names from CarbonProjection
*
* @param configuration Configuration info
* @param projection CarbonProjection object that includes unique projection column name
*/
public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
if (projection == null || projection.isEmpty()) {
return;
}
String[] allColumns = projection.getAllColumns();
StringBuilder builder = new StringBuilder();
for (String column : allColumns) {
builder.append(column).append(",");
}
String columnString = builder.toString();
columnString = columnString.substring(0, columnString.length() - 1);
configuration.set(COLUMN_PROJECTION, columnString);
}
public static String getColumnProjection(Configuration configuration) {
return configuration.get(COLUMN_PROJECTION);
}
public static void setFgDataMapPruning(Configuration configuration, boolean enable) {
configuration.set(FGDATAMAP_PRUNING, String.valueOf(enable));
}
public static boolean isFgDataMapPruningEnable(Configuration configuration) {
String enable = configuration.get(FGDATAMAP_PRUNING);
// if FDDATAMAP_PRUNING is not set, by default we will use FGDataMap
return (enable == null) || enable.equalsIgnoreCase("true");
}
/**
* Set list of segments to access
*/
public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) {
configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
}
/**
* Set `CARBON_INPUT_SEGMENTS` from property to configuration
*/
public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) {
String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase();
String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase();
setQuerySegmentToAccess(conf, dbName, tbName);
}
/**
* Set `CARBON_INPUT_SEGMENTS` from property to configuration
*/
public static void setQuerySegment(Configuration conf, String segmentList) {
if (!segmentList.trim().equals("*")) {
CarbonInputFormat
.setSegmentsToAccess(conf, Segment.toSegmentList(segmentList.split(","), null));
}
}
/**
* set list of partitions to prune
*/
public static void setPartitionsToPrune(Configuration configuration,
List<PartitionSpec> partitions) {
if (partitions == null) {
return;
}
try {
String partitionString =
ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions));
configuration.set(PARTITIONS_TO_PRUNE, partitionString);
} catch (Exception e) {
throw new RuntimeException(
"Error while setting partition information to Job" + partitions, e);
}
}
/**
* get list of partitions to prune
*/
public static List<PartitionSpec> getPartitionsToPrune(Configuration configuration)
throws IOException {
String partitionString = configuration.get(PARTITIONS_TO_PRUNE);
if (partitionString != null) {
return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString);
}
return null;
}
public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
throws IOException {
String tablePath = configuration.get(INPUT_DIR, "");
try {
return AbsoluteTableIdentifier
.from(tablePath, getDatabaseName(configuration), getTableName(configuration));
} catch (InvalidConfigurationException e) {
throw new IOException(e);
}
}
public static void setReadCommittedScope(Configuration configuration,
ReadCommittedScope committedScope) {
if (committedScope == null) {
return;
}
try {
String subFoldersString = ObjectSerializationUtil.convertObjectToString(committedScope);
configuration.set(READ_COMMITTED_SCOPE, subFoldersString);
} catch (Exception e) {
throw new RuntimeException(
"Error while setting committedScope information to Job" + committedScope, e);
}
}
public static ReadCommittedScope getReadCommittedScope(Configuration configuration)
throws IOException {
String subFoldersString = configuration.get(READ_COMMITTED_SCOPE);
if (subFoldersString != null) {
return (ReadCommittedScope) ObjectSerializationUtil.convertStringToObject(subFoldersString);
}
return null;
}
/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR
* are used to get table path to read.
*
* @param job
* @return List<InputSplit> list of CarbonInputSplit
* @throws IOException
*/
@Override
public abstract List<InputSplit> getSplits(JobContext job) throws IOException;
/**
* This method will execute a distributed job(DistributedDataMapJob) to get the count for the
* table. If the DistributedDataMapJob fails for some reason then an embedded job is fired to
* get the count.
*/
Long getDistributedCount(CarbonTable table,
List<PartitionSpec> partitionNames, List<Segment> validSegments) throws IOException {
DistributableDataMapFormat dataMapFormat =
new DistributableDataMapFormat(table, null, validSegments, new ArrayList<String>(),
partitionNames, false, null, false, false);
dataMapFormat.setIsWriteToFile(false);
try {
DataMapJob dataMapJob =
(DataMapJob) DataMapUtil.createDataMapJob(DataMapUtil.DISTRIBUTED_JOB_NAME);
if (dataMapJob == null) {
throw new ExceptionInInitializerError("Unable to create DistributedDataMapJob");
}
return dataMapJob.executeCountJob(dataMapFormat);
} catch (Exception e) {
LOG.error("Failed to get count from index server. Initializing fallback", e);
DataMapJob dataMapJob = DataMapUtil.getEmbeddedJob();
return dataMapJob.executeCountJob(dataMapFormat);
}
}
List<ExtendedBlocklet> getDistributedBlockRowCount(CarbonTable table,
List<PartitionSpec> partitionNames, List<Segment> validSegments,
List<Segment> invalidSegments, List<String> segmentsToBeRefreshed) throws IOException {
return getDistributedSplit(table, null, partitionNames, validSegments, invalidSegments,
segmentsToBeRefreshed, true);
}
private List<ExtendedBlocklet> getDistributedSplit(CarbonTable table,
FilterResolverIntf filterResolverIntf, List<PartitionSpec> partitionNames,
List<Segment> validSegments, List<Segment> invalidSegments,
List<String> segmentsToBeRefreshed, boolean isCountJob) throws IOException {
try {
DataMapJob dataMapJob =
(DataMapJob) DataMapUtil.createDataMapJob(DataMapUtil.DISTRIBUTED_JOB_NAME);
if (dataMapJob == null) {
throw new ExceptionInInitializerError("Unable to create DistributedDataMapJob");
}
return DataMapUtil
.executeDataMapJob(table, filterResolverIntf, dataMapJob, partitionNames, validSegments,
invalidSegments, null, false, segmentsToBeRefreshed, isCountJob);
} catch (Exception e) {
// Check if fallback is disabled for testing purposes then directly throw exception.
if (CarbonProperties.getInstance().isFallBackDisabled()) {
throw e;
}
LOG.error("Exception occurred while getting splits using index server. Initiating Fall "
+ "back to embedded mode", e);
return DataMapUtil.executeDataMapJob(table, filterResolverIntf,
DataMapUtil.getEmbeddedJob(), partitionNames, validSegments,
invalidSegments, null, true, segmentsToBeRefreshed, isCountJob);
}
}
protected DataMapFilter getFilterPredicates(Configuration configuration) {
try {
String filterExprString = configuration.get(FILTER_PREDICATE);
if (filterExprString == null) {
return null;
}
DataMapFilter filter =
(DataMapFilter) ObjectSerializationUtil.convertStringToObject(filterExprString);
if (filter != null) {
CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
filter.setTable(carbonTable);
}
return filter;
} catch (IOException e) {
throw new RuntimeException("Error while reading filter expression", e);
}
}
/**
* get data blocks of given segment
*/
protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job, CarbonTable carbonTable,
DataMapFilter expression, List<Segment> segmentIds,
List<Segment> invalidSegments, List<String> segmentsToBeRefreshed)
throws IOException {
QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
QueryStatistic statistic = new QueryStatistic();
// get tokens for all the required FileSystem for table path
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { new Path(carbonTable.getTablePath()) }, job.getConfiguration());
List<ExtendedBlocklet> prunedBlocklets =
getPrunedBlocklets(job, carbonTable, expression, segmentIds, invalidSegments,
segmentsToBeRefreshed);
List<CarbonInputSplit> resultFilteredBlocks = new ArrayList<>();
for (ExtendedBlocklet blocklet : prunedBlocklets) {
// matchedPartitions variable will be null in two cases as follows
// 1. the table is not a partition table
// 2. the table is a partition table, and all partitions are matched by query
// for partition table, the task id of carbaondata file name is the partition id.
// if this partition is not required, here will skip it.
resultFilteredBlocks.add(blocklet.getInputSplit());
}
statistic
.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
return resultFilteredBlocks;
}
/**
* for explain command
* get number of block by counting distinct file path of blocklets
*/
private int getBlockCount(List<ExtendedBlocklet> blocklets) {
Set<String> filepaths = new HashSet<>();
for (ExtendedBlocklet blocklet: blocklets) {
filepaths.add(blocklet.getPath());
}
return filepaths.size();
}
/**
* Prune the blocklets using the filter expression with available datamaps.
* First pruned with default blocklet datamap, then pruned with CG and FG datamaps
*/
private List<ExtendedBlocklet> getPrunedBlocklets(JobContext job, CarbonTable carbonTable,
DataMapFilter filter, List<Segment> segmentIds, List<Segment> invalidSegments,
List<String> segmentsToBeRefreshed) throws IOException {
ExplainCollector.addPruningInfo(carbonTable.getTableName());
filter = filter == null ? new DataMapFilter(carbonTable, null) : filter;
ExplainCollector.setFilterStatement(
filter.getExpression() == null ? "none" : filter.getExpression().getStatement());
boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
DataMapJob dataMapJob = DataMapUtil.getDataMapJob(job.getConfiguration());
List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
// First prune using default datamap on driver side.
TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(carbonTable);
List<ExtendedBlocklet> prunedBlocklets = null;
// This is to log the event, so user will know what is happening by seeing logs.
LOG.info("Started block pruning ...");
boolean isDistributedPruningEnabled = CarbonProperties.getInstance()
.isDistributedPruningEnabled(carbonTable.getDatabaseName(), carbonTable.getTableName());
if (isDistributedPruningEnabled) {
try {
prunedBlocklets =
getDistributedSplit(carbonTable, filter.getResolver(), partitionsToPrune, segmentIds,
invalidSegments, segmentsToBeRefreshed, false);
} catch (Exception e) {
// Check if fallback is disabled then directly throw exception otherwise try driver
// pruning.
if (CarbonProperties.getInstance().isFallBackDisabled()) {
throw e;
}
prunedBlocklets = defaultDataMap.prune(segmentIds, filter, partitionsToPrune);
}
} else {
prunedBlocklets = defaultDataMap.prune(segmentIds, filter, partitionsToPrune);
if (ExplainCollector.enabled()) {
ExplainCollector.setDefaultDataMapPruningBlockHit(getBlockCount(prunedBlocklets));
}
if (prunedBlocklets.size() == 0) {
return prunedBlocklets;
}
DataMapChooser chooser = new DataMapChooser(getOrCreateCarbonTable(job.getConfiguration()));
// Get the available CG datamaps and prune further.
DataMapExprWrapper cgDataMapExprWrapper = chooser.chooseCGDataMap(filter.getResolver());
if (cgDataMapExprWrapper != null) {
// Prune segments from already pruned blocklets
DataMapUtil.pruneSegments(segmentIds, prunedBlocklets);
List<ExtendedBlocklet> cgPrunedBlocklets = new ArrayList<>();
boolean isCGPruneFallback = false;
// Again prune with CG datamap.
try {
if (distributedCG && dataMapJob != null) {
cgPrunedBlocklets = DataMapUtil
.executeDataMapJob(carbonTable, filter.getResolver(), dataMapJob, partitionsToPrune,
segmentIds, invalidSegments, DataMapLevel.CG, new ArrayList<String>());
} else {
cgPrunedBlocklets = cgDataMapExprWrapper.prune(segmentIds, partitionsToPrune);
}
} catch (Exception e) {
isCGPruneFallback = true;
LOG.error("CG datamap pruning failed.", e);
}
// If isCGPruneFallback = true, it means that CG datamap pruning failed,
// hence no need to do intersect and simply pass the prunedBlocklets from default datamap
if (!isCGPruneFallback) {
// since index datamap prune in segment scope,
// the result need to intersect with previous pruned result
prunedBlocklets =
intersectFilteredBlocklets(carbonTable, prunedBlocklets, cgPrunedBlocklets);
}
if (ExplainCollector.enabled()) {
ExplainCollector.recordCGDataMapPruning(
DataMapWrapperSimpleInfo.fromDataMapWrapper(cgDataMapExprWrapper),
prunedBlocklets.size(), getBlockCount(prunedBlocklets));
}
}
if (prunedBlocklets.size() == 0) {
return prunedBlocklets;
}
// Now try to prune with FG DataMap.
if (isFgDataMapPruningEnable(job.getConfiguration()) && dataMapJob != null) {
DataMapExprWrapper fgDataMapExprWrapper = chooser.chooseFGDataMap(filter.getResolver());
List<ExtendedBlocklet> fgPrunedBlocklets;
if (fgDataMapExprWrapper != null) {
// Prune segments from already pruned blocklets
DataMapUtil.pruneSegments(segmentIds, prunedBlocklets);
// Prune segments from already pruned blocklets
fgPrunedBlocklets = DataMapUtil
.executeDataMapJob(carbonTable, filter.getResolver(), dataMapJob, partitionsToPrune,
segmentIds, invalidSegments, fgDataMapExprWrapper.getDataMapLevel(),
new ArrayList<String>());
// note that the 'fgPrunedBlocklets' has extra datamap related info compared with
// 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets'
prunedBlocklets =
intersectFilteredBlocklets(carbonTable, prunedBlocklets, fgPrunedBlocklets);
ExplainCollector.recordFGDataMapPruning(
DataMapWrapperSimpleInfo.fromDataMapWrapper(fgDataMapExprWrapper),
prunedBlocklets.size(), getBlockCount(prunedBlocklets));
}
}
}
LOG.info("Finished block pruning ...");
return prunedBlocklets;
}
private List<ExtendedBlocklet> intersectFilteredBlocklets(CarbonTable carbonTable,
List<ExtendedBlocklet> previousDataMapPrunedBlocklets,
List<ExtendedBlocklet> otherDataMapPrunedBlocklets) {
List<ExtendedBlocklet> prunedBlocklets = null;
if (BlockletDataMapUtil.isCacheLevelBlock(carbonTable)) {
prunedBlocklets = new ArrayList<>();
for (ExtendedBlocklet otherBlocklet : otherDataMapPrunedBlocklets) {
if (previousDataMapPrunedBlocklets.contains(otherBlocklet)) {
prunedBlocklets.add(otherBlocklet);
}
}
} else {
prunedBlocklets = (List) CollectionUtils
.intersection(otherDataMapPrunedBlocklets, previousDataMapPrunedBlocklets);
}
return prunedBlocklets;
}
static List<InputSplit> convertToCarbonInputSplit(List<ExtendedBlocklet> extendedBlocklets) {
List<InputSplit> resultFilteredBlocks = new ArrayList<>();
for (ExtendedBlocklet blocklet : extendedBlocklets) {
if (blocklet != null) {
resultFilteredBlocks.add(blocklet.getInputSplit());
}
}
return resultFilteredBlocks;
}
@Override
public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
Configuration configuration = taskAttemptContext.getConfiguration();
QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext,
getFilterPredicates(taskAttemptContext.getConfiguration()));
CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
return new CarbonRecordReader<T>(queryModel, readSupport,
taskAttemptContext.getConfiguration());
}
public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException {
return createQueryModel(inputSplit, taskAttemptContext,
getFilterPredicates(taskAttemptContext.getConfiguration()));
}
public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext,
DataMapFilter dataMapFilter) throws IOException {
Configuration configuration = taskAttemptContext.getConfiguration();
CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
// set projection column in the query model
String projectionString = getColumnProjection(configuration);
String[] projectColumns;
if (projectionString != null) {
projectColumns = projectionString.split(",");
} else {
projectColumns = new String[]{};
}
if (dataMapFilter != null) {
checkAndAddImplicitExpression(dataMapFilter.getExpression(), inputSplit);
}
return new QueryModelBuilder(carbonTable)
.projectColumns(projectColumns)
.filterExpression(dataMapFilter)
.dataConverter(getDataTypeConverter(configuration))
.build();
}
/**
* This method will create an Implict Expression and set it as right child in the given
* expression
*
* @param expression
* @param inputSplit
*/
private void checkAndAddImplicitExpression(Expression expression, InputSplit inputSplit) {
if (inputSplit instanceof CarbonMultiBlockSplit) {
CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
List<CarbonInputSplit> splits = split.getAllSplits();
// iterate over all the splits and create block to bblocklet mapping
Map<String, Set<Integer>> blockIdToBlockletIdMapping = new HashMap<>();
for (CarbonInputSplit carbonInputSplit : splits) {
Set<Integer> validBlockletIds = carbonInputSplit.getValidBlockletIds();
if (null != validBlockletIds && !validBlockletIds.isEmpty()) {
String uniqueBlockPath = carbonInputSplit.getFilePath();
String shortBlockPath = CarbonTablePath
.getShortBlockId(uniqueBlockPath.substring(uniqueBlockPath.lastIndexOf("/Part") + 1));
blockIdToBlockletIdMapping.put(shortBlockPath, validBlockletIds);
}
}
if (!blockIdToBlockletIdMapping.isEmpty()) {
// create implicit expression and set as right child
FilterUtil
.createImplicitExpressionAndSetAsRightChild(expression, blockIdToBlockletIdMapping);
}
}
}
public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
//By default it uses dictionary decoder read class
CarbonReadSupport<T> readSupport = null;
if (readSupportClass != null) {
try {
Class<?> myClass = Class.forName(readSupportClass);
Constructor<?> constructor = myClass.getConstructors()[0];
Object object = constructor.newInstance();
if (object instanceof CarbonReadSupport) {
readSupport = (CarbonReadSupport) object;
}
} catch (ClassNotFoundException ex) {
LOG.error("Class " + readSupportClass + "not found", ex);
} catch (Exception ex) {
LOG.error("Error while creating " + readSupportClass, ex);
}
} else {
readSupport = new DictionaryDecodeReadSupport<>();
}
return readSupport;
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
try {
// Don't split the file if it is local file system
FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
if (fileSystem instanceof LocalFileSystem) {
return false;
}
} catch (Exception e) {
return true;
}
return true;
}
public static void setCarbonReadSupport(Configuration configuration,
Class<? extends CarbonReadSupport> readSupportClass) {
if (readSupportClass != null) {
configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
}
}
/**
* It is optional, if user does not set then it reads from store
*
* @param configuration
* @param converterClass is the Data type converter for different computing engine
*/
public static void setDataTypeConverter(
Configuration configuration, Class<? extends DataTypeConverter> converterClass) {
if (null != converterClass) {
configuration.set(CARBON_CONVERTER, converterClass.getCanonicalName());
}
}
public static DataTypeConverter getDataTypeConverter(Configuration configuration)
throws IOException {
String converterClass = configuration.get(CARBON_CONVERTER);
if (converterClass == null) {
return new DataTypeConverterImpl();
}
try {
return (DataTypeConverter) Class.forName(converterClass).newInstance();
} catch (Exception e) {
throw new IOException(e);
}
}
public static void setDatabaseName(Configuration configuration, String databaseName) {
if (null != databaseName) {
configuration.set(DATABASE_NAME, databaseName);
}
}
public static String getDatabaseName(Configuration configuration)
throws InvalidConfigurationException {
String databseName = configuration.get(DATABASE_NAME);
if (null == databseName) {
throw new InvalidConfigurationException("Database name is not set.");
}
return databseName;
}
public static void setTableName(Configuration configuration, String tableName) {
if (null != tableName) {
configuration.set(TABLE_NAME, tableName);
}
}
public static String getTableName(Configuration configuration)
throws InvalidConfigurationException {
String tableName = configuration.get(TABLE_NAME);
if (tableName == null) {
throw new InvalidConfigurationException("Table name is not set");
}
return tableName;
}
/**
* Project all Columns for carbon reader
*
* @return String araay of columnNames
* @param carbonTable
*/
public String[] projectAllColumns(CarbonTable carbonTable) {
List<ColumnSchema> colList = carbonTable.getTableInfo().getFactTable().getListOfColumns();
List<String> projectColumns = new ArrayList<>();
// complex type and add just the parent column name while skipping the child columns.
for (ColumnSchema col : colList) {
if (!col.getColumnName().contains(".")) {
projectColumns.add(col.getColumnName());
}
}
return projectColumns.toArray(new String[projectColumns.size()]);
}
private static void setQuerySegmentToAccess(Configuration conf, String dbName, String tableName) {
String segmentNumbersFromProperty = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tableName, "*");
if (!segmentNumbersFromProperty.trim().equals("*")) {
CarbonInputFormat.setSegmentsToAccess(conf,
Segment.toSegmentList(segmentNumbersFromProperty.split(","), null));
}
}
/**
* Set `CARBON_INPUT_SEGMENTS` from property to configuration
*/
public static void setQuerySegment(Configuration conf, CarbonTable carbonTable) {
String tableName = carbonTable.getTableName();
setQuerySegmentToAccess(conf, carbonTable.getDatabaseName(), tableName);
}
}