| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.hadoop.api; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.lang.reflect.Constructor; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.BitSet; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.datamap.DataMapStoreManager; |
| import org.apache.carbondata.core.datamap.TableDataMap; |
| import org.apache.carbondata.core.indexstore.Blocklet; |
| import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap; |
| import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; |
| import org.apache.carbondata.core.keygenerator.KeyGenException; |
| import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; |
| import org.apache.carbondata.core.metadata.ColumnarFormatVersion; |
| import org.apache.carbondata.core.metadata.schema.PartitionInfo; |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable; |
| import org.apache.carbondata.core.metadata.schema.table.TableInfo; |
| import org.apache.carbondata.core.mutate.CarbonUpdateUtil; |
| import org.apache.carbondata.core.mutate.SegmentUpdateDetails; |
| import org.apache.carbondata.core.mutate.UpdateVO; |
| import org.apache.carbondata.core.mutate.data.BlockMappingVO; |
| import org.apache.carbondata.core.scan.expression.Expression; |
| import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; |
| import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; |
| import org.apache.carbondata.core.scan.model.CarbonQueryPlan; |
| import org.apache.carbondata.core.scan.model.QueryModel; |
| import org.apache.carbondata.core.stats.QueryStatistic; |
| import org.apache.carbondata.core.stats.QueryStatisticsConstants; |
| import org.apache.carbondata.core.stats.QueryStatisticsRecorder; |
| import org.apache.carbondata.core.statusmanager.SegmentStatusManager; |
| import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; |
| import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; |
| import org.apache.carbondata.core.util.CarbonUtil; |
| import org.apache.carbondata.core.util.DataTypeConverter; |
| import org.apache.carbondata.core.util.DataTypeConverterImpl; |
| import org.apache.carbondata.core.util.path.CarbonStorePath; |
| import org.apache.carbondata.core.util.path.CarbonTablePath; |
| import org.apache.carbondata.hadoop.CarbonInputSplit; |
| import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; |
| import org.apache.carbondata.hadoop.CarbonProjection; |
| import org.apache.carbondata.hadoop.CarbonRecordReader; |
| import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; |
| import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; |
| import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; |
| import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; |
| import org.apache.carbondata.hadoop.util.SchemaReader; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.InvalidPathException; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.RecordReader; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
| import org.apache.hadoop.mapreduce.lib.input.FileSplit; |
| import org.apache.hadoop.mapreduce.security.TokenCache; |
| import org.apache.hadoop.util.StringUtils; |
| |
| /** |
| * Input format of CarbonData file. |
| * |
| * @param <T> |
| */ |
| public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { |
| |
| // comma separated list of input segment numbers |
| public static final String INPUT_SEGMENT_NUMBERS = |
| "mapreduce.input.carboninputformat.segmentnumbers"; |
| // comma separated list of input files |
| public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files"; |
| public static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid"; |
| private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class); |
| private static final String FILTER_PREDICATE = |
| "mapreduce.input.carboninputformat.filter.predicate"; |
| private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection"; |
| private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo"; |
| private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport"; |
| private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter"; |
| |
| // a cache for carbon table, it will be used in task side |
| private CarbonTable carbonTable; |
| |
| /** |
| * Set the `tableInfo` in `configuration` |
| */ |
| public static void setTableInfo(Configuration configuration, TableInfo tableInfo) |
| throws IOException { |
| if (null != tableInfo) { |
| configuration.set(TABLE_INFO, ObjectSerializationUtil.encodeToString(tableInfo.serialize())); |
| } |
| } |
| |
| /** |
| * Get TableInfo object from `configuration` |
| */ |
| private TableInfo getTableInfo(Configuration configuration) throws IOException { |
| String tableInfoStr = configuration.get(TABLE_INFO); |
| if (tableInfoStr == null) { |
| return null; |
| } else { |
| TableInfo output = new TableInfo(); |
| output.readFields( |
| new DataInputStream( |
| new ByteArrayInputStream(ObjectSerializationUtil.decodeStringToBytes(tableInfoStr)))); |
| return output; |
| } |
| } |
| |
| /** |
| * Get the cached CarbonTable or create it by TableInfo in `configuration` |
| */ |
| private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { |
| if (carbonTable == null) { |
| // carbon table should be created either from deserialized table info (schema saved in |
| // hive metastore) or by reading schema in HDFS (schema saved in HDFS) |
| TableInfo tableInfo = getTableInfo(configuration); |
| CarbonTable carbonTable; |
| if (tableInfo != null) { |
| carbonTable = CarbonTable.buildFromTableInfo(tableInfo); |
| } else { |
| carbonTable = SchemaReader.readCarbonTableFromStore( |
| getAbsoluteTableIdentifier(configuration)); |
| } |
| this.carbonTable = carbonTable; |
| return carbonTable; |
| } else { |
| return this.carbonTable; |
| } |
| } |
| |
| public static void setTablePath(Configuration configuration, String tablePath) |
| throws IOException { |
| configuration.set(FileInputFormat.INPUT_DIR, tablePath); |
| } |
| |
| public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) { |
| configuration.set(ALTER_PARTITION_ID, partitionIds.toString()); |
| } |
| |
| /** |
| * It sets unresolved filter expression. |
| * |
| * @param configuration |
| * @param filterExpression |
| */ |
| public static void setFilterPredicates(Configuration configuration, Expression filterExpression) { |
| if (filterExpression == null) { |
| return; |
| } |
| try { |
| String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression); |
| configuration.set(FILTER_PREDICATE, filterString); |
| } catch (Exception e) { |
| throw new RuntimeException("Error while setting filter expression to Job", e); |
| } |
| } |
| |
| public static void setColumnProjection(Configuration configuration, CarbonProjection projection) { |
| if (projection == null || projection.isEmpty()) { |
| return; |
| } |
| String[] allColumns = projection.getAllColumns(); |
| StringBuilder builder = new StringBuilder(); |
| for (String column : allColumns) { |
| builder.append(column).append(","); |
| } |
| String columnString = builder.toString(); |
| columnString = columnString.substring(0, columnString.length() - 1); |
| configuration.set(COLUMN_PROJECTION, columnString); |
| } |
| |
| public static String getColumnProjection(Configuration configuration) { |
| return configuration.get(COLUMN_PROJECTION); |
| } |
| |
| public static void setCarbonReadSupport(Configuration configuration, |
| Class<? extends CarbonReadSupport> readSupportClass) { |
| if (readSupportClass != null) { |
| configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName()); |
| } |
| } |
| |
| private static CarbonTablePath getTablePath(AbsoluteTableIdentifier absIdentifier) { |
| return CarbonStorePath.getCarbonTablePath(absIdentifier); |
| } |
| |
| /** |
| * Set list of segments to access |
| */ |
| public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) { |
| configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments)); |
| } |
| |
| /** |
| * Set list of files to access |
| */ |
| public static void setFilesToAccess(Configuration configuration, List<String> validFiles) { |
| configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles)); |
| } |
| |
| private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) |
| throws IOException { |
| String dirs = configuration.get(INPUT_DIR, ""); |
| String[] inputPaths = StringUtils.split(dirs); |
| if (inputPaths.length == 0) { |
| throw new InvalidPathException("No input paths specified in job"); |
| } |
| return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * Configurations FileInputFormat.INPUT_DIR |
| * are used to get table path to read. |
| * |
| * @param job |
| * @return List<InputSplit> list of CarbonInputSplit |
| * @throws IOException |
| */ |
| @Override public List<InputSplit> getSplits(JobContext job) throws IOException { |
| AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); |
| TableDataMap blockletMap = |
| DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME, |
| BlockletDataMapFactory.class); |
| List<String> invalidSegments = new ArrayList<>(); |
| List<UpdateVO> invalidTimestampsList = new ArrayList<>(); |
| List<String> validSegments = Arrays.asList(getSegmentsToAccess(job)); |
| // get all valid segments and set them into the configuration |
| if (validSegments.size() == 0) { |
| SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); |
| SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = |
| segmentStatusManager.getValidAndInvalidSegments(); |
| SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier); |
| validSegments = segments.getValidSegments(); |
| if (validSegments.size() == 0) { |
| return new ArrayList<>(0); |
| } |
| |
| // remove entry in the segment index if there are invalid segments |
| invalidSegments.addAll(segments.getInvalidSegments()); |
| for (String invalidSegmentId : invalidSegments) { |
| invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId)); |
| } |
| if (invalidSegments.size() > 0) { |
| blockletMap.clear(invalidSegments); |
| } |
| } |
| |
| // process and resolve the expression |
| Expression filter = getFilterPredicates(job.getConfiguration()); |
| CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); |
| // this will be null in case of corrupt schema file. |
| if (null == carbonTable) { |
| throw new IOException("Missing/Corrupt schema file for table."); |
| } |
| PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName()); |
| CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); |
| |
| // prune partitions for filter query on partition table |
| BitSet matchedPartitions = null; |
| if (partitionInfo != null) { |
| matchedPartitions = setMatchedPartitions(null, filter, partitionInfo); |
| if (matchedPartitions != null) { |
| if (matchedPartitions.cardinality() == 0) { |
| return new ArrayList<InputSplit>(); |
| } else if (matchedPartitions.cardinality() == partitionInfo.getNumPartitions()) { |
| matchedPartitions = null; |
| } |
| } |
| } |
| |
| FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier); |
| |
| // do block filtering and get split |
| List<InputSplit> splits = |
| getSplits(job, filterInterface, validSegments, matchedPartitions, partitionInfo, null); |
| // pass the invalid segment to task side in order to remove index entry in task side |
| if (invalidSegments.size() > 0) { |
| for (InputSplit split : splits) { |
| ((org.apache.carbondata.hadoop.CarbonInputSplit) split).setInvalidSegments(invalidSegments); |
| ((org.apache.carbondata.hadoop.CarbonInputSplit) split) |
| .setInvalidTimestampRange(invalidTimestampsList); |
| } |
| } |
| return splits; |
| } |
| |
| /** |
| * Read data in one segment. For alter table partition statement |
| * @param job |
| * @param targetSegment |
| * @param oldPartitionIdList get old partitionId before partitionInfo was changed |
| * @return |
| * @throws IOException |
| */ |
| public List<InputSplit> getSplitsOfOneSegment(JobContext job, String targetSegment, |
| List<Integer> oldPartitionIdList, PartitionInfo partitionInfo) |
| throws IOException { |
| AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); |
| List<String> invalidSegments = new ArrayList<>(); |
| List<UpdateVO> invalidTimestampsList = new ArrayList<>(); |
| |
| List<String> segmentList = new ArrayList<>(); |
| segmentList.add(targetSegment); |
| setSegmentsToAccess(job.getConfiguration(), segmentList); |
| try { |
| |
| // process and resolve the expression |
| Expression filter = getFilterPredicates(job.getConfiguration()); |
| CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); |
| // this will be null in case of corrupt schema file. |
| if (null == carbonTable) { |
| throw new IOException("Missing/Corrupt schema file for table."); |
| } |
| |
| CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); |
| |
| // prune partitions for filter query on partition table |
| String partitionIds = job.getConfiguration().get(ALTER_PARTITION_ID); |
| BitSet matchedPartitions = null; |
| if (partitionInfo != null) { |
| matchedPartitions = setMatchedPartitions(partitionIds, filter, partitionInfo); |
| if (matchedPartitions != null) { |
| if (matchedPartitions.cardinality() == 0) { |
| return new ArrayList<InputSplit>(); |
| } else if (matchedPartitions.cardinality() == partitionInfo.getNumPartitions()) { |
| matchedPartitions = null; |
| } |
| } |
| } |
| |
| FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier); |
| // do block filtering and get split |
| List<InputSplit> splits = getSplits(job, filterInterface, segmentList, matchedPartitions, |
| partitionInfo, oldPartitionIdList); |
| // pass the invalid segment to task side in order to remove index entry in task side |
| if (invalidSegments.size() > 0) { |
| for (InputSplit split : splits) { |
| ((CarbonInputSplit) split).setInvalidSegments(invalidSegments); |
| ((CarbonInputSplit) split).setInvalidTimestampRange(invalidTimestampsList); |
| } |
| } |
| return splits; |
| } catch (IOException e) { |
| throw new RuntimeException("Can't get splits of the target segment ", e); |
| } |
| } |
| |
| private BitSet setMatchedPartitions(String partitionIds, Expression filter, |
| PartitionInfo partitionInfo) { |
| BitSet matchedPartitions = null; |
| if (null != partitionIds) { |
| String[] partList = partitionIds.replace("[", "").replace("]", "").split(","); |
| // only one partitionId in current alter table statement |
| matchedPartitions = new BitSet(Integer.parseInt(partList[0])); |
| for (String partitionId : partList) { |
| matchedPartitions.set(Integer.parseInt(partitionId)); |
| } |
| } else { |
| if (null != filter) { |
| matchedPartitions = |
| new FilterExpressionProcessor().getFilteredPartitions(filter, partitionInfo); |
| } |
| } |
| return matchedPartitions; |
| } |
| /** |
| * {@inheritDoc} |
| * Configurations FileInputFormat.INPUT_DIR, CarbonInputFormat.INPUT_SEGMENT_NUMBERS |
| * are used to get table path to read. |
| * |
| * @return |
| * @throws IOException |
| */ |
| private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver, |
| List<String> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo, |
| List<Integer> oldPartitionIdList) throws IOException { |
| |
| List<InputSplit> result = new LinkedList<InputSplit>(); |
| UpdateVO invalidBlockVOForSegmentId = null; |
| Boolean isIUDTable = false; |
| |
| AbsoluteTableIdentifier absoluteTableIdentifier = |
| getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier(); |
| SegmentUpdateStatusManager updateStatusManager = |
| new SegmentUpdateStatusManager(absoluteTableIdentifier); |
| |
| isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); |
| |
| // for each segment fetch blocks matching filter in Driver BTree |
| List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment = |
| getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions, |
| validSegments, partitionInfo, oldPartitionIdList); |
| for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) { |
| |
| // Get the UpdateVO for those tables on which IUD operations being performed. |
| if (isIUDTable) { |
| invalidBlockVOForSegmentId = |
| updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId()); |
| } |
| if (isIUDTable) { |
| // In case IUD is not performed in this table avoid searching for |
| // invalidated blocks. |
| if (CarbonUtil |
| .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(), |
| invalidBlockVOForSegmentId, updateStatusManager)) { |
| continue; |
| } |
| } |
| String[] deleteDeltaFilePath = null; |
| try { |
| deleteDeltaFilePath = |
| updateStatusManager.getDeleteDeltaFilePath(inputSplit.getPath().toString()); |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath); |
| result.add(inputSplit); |
| } |
| return result; |
| } |
| |
| protected Expression getFilterPredicates(Configuration configuration) { |
| try { |
| String filterExprString = configuration.get(FILTER_PREDICATE); |
| if (filterExprString == null) { |
| return null; |
| } |
| Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString); |
| return (Expression) filter; |
| } catch (IOException e) { |
| throw new RuntimeException("Error while reading filter expression", e); |
| } |
| } |
| |
| /** |
| * get data blocks of given segment |
| */ |
| private List<org.apache.carbondata.hadoop.CarbonInputSplit> getDataBlocksOfSegment(JobContext job, |
| AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver, |
| BitSet matchedPartitions, List<String> segmentIds, PartitionInfo partitionInfo, |
| List<Integer> oldPartitionIdList) throws IOException { |
| |
| QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder(); |
| QueryStatistic statistic = new QueryStatistic(); |
| |
| // get tokens for all the required FileSystem for table path |
| TokenCache.obtainTokensForNamenodes(job.getCredentials(), |
| new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration()); |
| |
| TableDataMap blockletMap = DataMapStoreManager.getInstance() |
| .getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME, BlockletDataMapFactory.class); |
| List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver); |
| |
| List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>(); |
| int partitionIndex = 0; |
| List<Integer> partitionIdList = new ArrayList<>(); |
| if (partitionInfo != null) { |
| partitionIdList = partitionInfo.getPartitionIds(); |
| } |
| for (Blocklet blocklet : prunedBlocklets) { |
| int partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo( |
| CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath().toString())); |
| |
| // OldPartitionIdList is only used in alter table partition command because it change |
| // partition info first and then read data. |
| // For other normal query should use newest partitionIdList |
| if (partitionInfo != null) { |
| if (oldPartitionIdList != null) { |
| partitionIndex = oldPartitionIdList.indexOf(partitionId); |
| } else { |
| partitionIndex = partitionIdList.indexOf(partitionId); |
| } |
| } |
| if (partitionIndex != -1) { |
| // matchedPartitions variable will be null in two cases as follows |
| // 1. the table is not a partition table |
| // 2. the table is a partition table, and all partitions are matched by query |
| // for partition table, the task id of carbaondata file name is the partition id. |
| // if this partition is not required, here will skip it. |
| if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) { |
| resultFilterredBlocks.add(convertToCarbonInputSplit(blocklet)); |
| } |
| } |
| } |
| statistic |
| .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis()); |
| recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id")); |
| return resultFilterredBlocks; |
| } |
| |
| private org.apache.carbondata.hadoop.CarbonInputSplit convertToCarbonInputSplit(Blocklet blocklet) |
| throws IOException { |
| blocklet.updateLocations(); |
| org.apache.carbondata.hadoop.CarbonInputSplit split = |
| org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(), |
| new FileSplit(blocklet.getPath(), 0, blocklet.getLength(), blocklet.getLocations()), |
| ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber())); |
| split.setDetailInfo(blocklet.getDetailInfo()); |
| return split; |
| } |
| |
| @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, |
| TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { |
| Configuration configuration = taskAttemptContext.getConfiguration(); |
| QueryModel queryModel = getQueryModel(inputSplit, taskAttemptContext); |
| CarbonReadSupport<T> readSupport = getReadSupportClass(configuration); |
| return new CarbonRecordReader<T>(queryModel, readSupport); |
| } |
| |
| public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) |
| throws IOException { |
| Configuration configuration = taskAttemptContext.getConfiguration(); |
| CarbonTable carbonTable = getOrCreateCarbonTable(configuration); |
| // getting the table absoluteTableIdentifier from the carbonTable |
| // to avoid unnecessary deserialization |
| AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); |
| |
| // query plan includes projection column |
| String projection = getColumnProjection(configuration); |
| CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection); |
| QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable, |
| getDataTypeConverter(configuration)); |
| |
| // set the filter to the query model in order to filter blocklet before scan |
| Expression filter = getFilterPredicates(configuration); |
| CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); |
| FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier); |
| queryModel.setFilterExpressionResolverTree(filterIntf); |
| |
| // update the file level index store if there are invalid segment |
| if (inputSplit instanceof CarbonMultiBlockSplit) { |
| CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit; |
| List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments(); |
| if (invalidSegments.size() > 0) { |
| queryModel.setInvalidSegmentIds(invalidSegments); |
| } |
| List<UpdateVO> invalidTimestampRangeList = |
| split.getAllSplits().get(0).getInvalidTimestampRange(); |
| if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) { |
| queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList); |
| } |
| } |
| return queryModel; |
| } |
| |
| public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) { |
| String readSupportClass = configuration.get(CARBON_READ_SUPPORT); |
| //By default it uses dictionary decoder read class |
| CarbonReadSupport<T> readSupport = null; |
| if (readSupportClass != null) { |
| try { |
| Class<?> myClass = Class.forName(readSupportClass); |
| Constructor<?> constructor = myClass.getConstructors()[0]; |
| Object object = constructor.newInstance(); |
| if (object instanceof CarbonReadSupport) { |
| readSupport = (CarbonReadSupport) object; |
| } |
| } catch (ClassNotFoundException ex) { |
| LOG.error("Class " + readSupportClass + "not found", ex); |
| } catch (Exception ex) { |
| LOG.error("Error while creating " + readSupportClass, ex); |
| } |
| } else { |
| readSupport = new DictionaryDecodeReadSupport<>(); |
| } |
| return readSupport; |
| } |
| |
| @Override protected boolean isSplitable(JobContext context, Path filename) { |
| try { |
| // Don't split the file if it is local file system |
| FileSystem fileSystem = filename.getFileSystem(context.getConfiguration()); |
| if (fileSystem instanceof LocalFileSystem) { |
| return false; |
| } |
| } catch (Exception e) { |
| return true; |
| } |
| return true; |
| } |
| |
| /** |
| * required to be moved to core |
| * |
| * @return updateExtension |
| */ |
| private String getUpdateExtension() { |
| // TODO: required to modify when supporting update, mostly will be update timestamp |
| return "update"; |
| } |
| |
| /** |
| * return valid segment to access |
| */ |
| private String[] getSegmentsToAccess(JobContext job) { |
| String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, ""); |
| if (segmentString.trim().isEmpty()) { |
| return new String[0]; |
| } |
| return segmentString.split(","); |
| } |
| |
| /** |
| * Get the row count of the Block and mapping of segment and Block count. |
| * |
| * @param job |
| * @param identifier |
| * @return |
| * @throws IOException |
| * @throws KeyGenException |
| */ |
| public BlockMappingVO getBlockRowCount(JobContext job, AbsoluteTableIdentifier identifier) |
| throws IOException, KeyGenException { |
| TableDataMap blockletMap = DataMapStoreManager.getInstance() |
| .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class); |
| SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier); |
| SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = |
| new SegmentStatusManager(identifier).getValidAndInvalidSegments(); |
| Map<String, Long> blockRowCountMapping = |
| new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); |
| Map<String, Long> segmentAndBlockCountMapping = |
| new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); |
| List<Blocklet> blocklets = blockletMap.prune(validAndInvalidSegments.getValidSegments(), null); |
| for (Blocklet blocklet : blocklets) { |
| String blockName = blocklet.getPath().toString(); |
| blockName = CarbonTablePath.getCarbonDataFileName(blockName); |
| blockName = blockName + CarbonTablePath.getCarbonDataExtension(); |
| |
| long rowCount = blocklet.getDetailInfo().getRowCount(); |
| |
| String key = CarbonUpdateUtil.getSegmentBlockNameKey(blocklet.getSegmentId(), blockName); |
| |
| // if block is invalid then dont add the count |
| SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key); |
| |
| if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) { |
| Long blockCount = blockRowCountMapping.get(key); |
| if (blockCount == null) { |
| blockCount = 0L; |
| Long count = segmentAndBlockCountMapping.get(blocklet.getSegmentId()); |
| if (count == null) { |
| count = 0L; |
| } |
| segmentAndBlockCountMapping.put(blocklet.getSegmentId(), count + 1); |
| } |
| blockCount += rowCount; |
| blockRowCountMapping.put(key, blockCount); |
| } |
| } |
| return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping); |
| } |
| |
| /** |
| * It is optional, if user does not set then it reads from store |
| * |
| * @param configuration |
| * @param converter is the Data type converter for different computing engine |
| * @throws IOException |
| */ |
| public static void setDataTypeConverter(Configuration configuration, DataTypeConverter converter) |
| throws IOException { |
| if (null != converter) { |
| configuration.set(CARBON_CONVERTER, |
| ObjectSerializationUtil.convertObjectToString(converter)); |
| } |
| } |
| |
| public static DataTypeConverter getDataTypeConverter(Configuration configuration) |
| throws IOException { |
| String converter = configuration.get(CARBON_CONVERTER); |
| if (converter == null) { |
| return new DataTypeConverterImpl(); |
| } |
| return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter); |
| } |
| } |