| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.hadoop.api; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.lang.reflect.Constructor; |
| import java.util.ArrayList; |
| import java.util.BitSet; |
| import java.util.List; |
| |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal; |
| import org.apache.carbondata.core.datamap.DataMapChooser; |
| import org.apache.carbondata.core.datamap.DataMapLevel; |
| import org.apache.carbondata.core.datamap.Segment; |
| import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; |
| import org.apache.carbondata.core.exception.InvalidConfigurationException; |
| import org.apache.carbondata.core.indexstore.ExtendedBlocklet; |
| import org.apache.carbondata.core.indexstore.PartitionSpec; |
| import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; |
| import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; |
| import org.apache.carbondata.core.metadata.ColumnarFormatVersion; |
| import org.apache.carbondata.core.metadata.schema.PartitionInfo; |
| import org.apache.carbondata.core.metadata.schema.partition.PartitionType; |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable; |
| import org.apache.carbondata.core.metadata.schema.table.TableInfo; |
| import org.apache.carbondata.core.mutate.UpdateVO; |
| import org.apache.carbondata.core.scan.expression.Expression; |
| import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; |
| import org.apache.carbondata.core.scan.model.QueryModel; |
| import org.apache.carbondata.core.scan.model.QueryModelBuilder; |
| import org.apache.carbondata.core.stats.QueryStatistic; |
| import org.apache.carbondata.core.stats.QueryStatisticsConstants; |
| import org.apache.carbondata.core.stats.QueryStatisticsRecorder; |
| import org.apache.carbondata.core.util.CarbonProperties; |
| import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; |
| import org.apache.carbondata.core.util.CarbonUtil; |
| import org.apache.carbondata.core.util.DataTypeConverter; |
| import org.apache.carbondata.core.util.DataTypeConverterImpl; |
| import org.apache.carbondata.core.util.path.CarbonTablePath; |
| import org.apache.carbondata.hadoop.CarbonInputSplit; |
| import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; |
| import org.apache.carbondata.hadoop.CarbonProjection; |
| import org.apache.carbondata.hadoop.CarbonRecordReader; |
| import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; |
| import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; |
| import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.RecordReader; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
| import org.apache.hadoop.mapreduce.lib.input.FileSplit; |
| import org.apache.hadoop.mapreduce.security.TokenCache; |
| |
| /** |
| * Base class for carbondata input format, there are two input format implementations: |
| * 1. CarbonFileInputFormat: for reading carbondata files without table level metadata support. |
| * |
| * 2. CarbonTableInputFormat: for reading carbondata files with table level metadata support, |
| * such as segment and explicit schema metadata. |
| * |
| * @param <T> |
| */ |
| public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { |
| // comma separated list of input segment numbers |
| public static final String INPUT_SEGMENT_NUMBERS = |
| "mapreduce.input.carboninputformat.segmentnumbers"; |
| private static final String VALIDATE_INPUT_SEGMENT_IDs = |
| "mapreduce.input.carboninputformat.validsegments"; |
| // comma separated list of input files |
| private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid"; |
| private static final Log LOG = LogFactory.getLog(CarbonInputFormat.class); |
| private static final String FILTER_PREDICATE = |
| "mapreduce.input.carboninputformat.filter.predicate"; |
| private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection"; |
| private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo"; |
| private static final String CARBON_TRANSACTIONAL_TABLE = |
| "mapreduce.input.carboninputformat.transactional"; |
| private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport"; |
| private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter"; |
| private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; |
| public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName"; |
| public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName"; |
| private static final String PARTITIONS_TO_PRUNE = |
| "mapreduce.input.carboninputformat.partitions.to.prune"; |
| private static final String FGDATAMAP_PRUNING = "mapreduce.input.carboninputformat.fgdatamap"; |
| |
| // record segment number and hit blocks |
| protected int numSegments = 0; |
| protected int numStreamSegments = 0; |
| protected int numBlocks = 0; |
| |
| public int getNumSegments() { |
| return numSegments; |
| } |
| |
| public int getNumStreamSegments() { |
| return numStreamSegments; |
| } |
| |
| public int getNumBlocks() { |
| return numBlocks; |
| } |
| |
| /** |
| * Set the `tableInfo` in `configuration` |
| */ |
| public static void setTableInfo(Configuration configuration, TableInfo tableInfo) |
| throws IOException { |
| if (null != tableInfo) { |
| configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize())); |
| } |
| } |
| |
| /** |
| * Get TableInfo object from `configuration` |
| */ |
| protected static TableInfo getTableInfo(Configuration configuration) throws IOException { |
| String tableInfoStr = configuration.get(TABLE_INFO); |
| if (tableInfoStr == null) { |
| return null; |
| } else { |
| TableInfo output = new TableInfo(); |
| output.readFields(new DataInputStream( |
| new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr)))); |
| return output; |
| } |
| } |
| |
| /** |
| * Get the cached CarbonTable or create it by TableInfo in `configuration` |
| */ |
| protected abstract CarbonTable getOrCreateCarbonTable(Configuration configuration) |
| throws IOException; |
| |
| public static void setTablePath(Configuration configuration, String tablePath) { |
| configuration.set(FileInputFormat.INPUT_DIR, tablePath); |
| } |
| |
| public static void setTransactionalTable(Configuration configuration, |
| boolean isTransactionalTable) { |
| configuration.set(CARBON_TRANSACTIONAL_TABLE, String.valueOf(isTransactionalTable)); |
| } |
| |
| public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) { |
| configuration.set(ALTER_PARTITION_ID, partitionIds.toString()); |
| } |
| |
| public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob) |
| throws IOException { |
| if (dataMapJob != null) { |
| String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob); |
| configuration.set(DATA_MAP_DSTR, toString); |
| } |
| } |
| |
| public static DataMapJob getDataMapJob(Configuration configuration) throws IOException { |
| String jobString = configuration.get(DATA_MAP_DSTR); |
| if (jobString != null) { |
| return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString); |
| } |
| return null; |
| } |
| |
| /** |
| * It sets unresolved filter expression. |
| * |
| * @param configuration |
| * @param filterExpression |
| */ |
| public static void setFilterPredicates(Configuration configuration, Expression filterExpression) { |
| if (filterExpression == null) { |
| return; |
| } |
| try { |
| String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression); |
| configuration.set(FILTER_PREDICATE, filterString); |
| } catch (Exception e) { |
| throw new RuntimeException("Error while setting filter expression to Job", e); |
| } |
| } |
| |
| public static void setColumnProjection(Configuration configuration, CarbonProjection projection) { |
| if (projection == null || projection.isEmpty()) { |
| return; |
| } |
| String[] allColumns = projection.getAllColumns(); |
| StringBuilder builder = new StringBuilder(); |
| for (String column : allColumns) { |
| builder.append(column).append(","); |
| } |
| String columnString = builder.toString(); |
| columnString = columnString.substring(0, columnString.length() - 1); |
| configuration.set(COLUMN_PROJECTION, columnString); |
| } |
| |
| public static String getColumnProjection(Configuration configuration) { |
| return configuration.get(COLUMN_PROJECTION); |
| } |
| |
| public static void setFgDataMapPruning(Configuration configuration, boolean enable) { |
| configuration.set(FGDATAMAP_PRUNING, String.valueOf(enable)); |
| } |
| |
| public static boolean isFgDataMapPruningEnable(Configuration configuration) { |
| String enable = configuration.get(FGDATAMAP_PRUNING); |
| |
| // if FDDATAMAP_PRUNING is not set, by default we will use FGDataMap |
| return (enable == null) || enable.equalsIgnoreCase("true"); |
| } |
| |
| /** |
| * Set list of segments to access |
| */ |
| public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) { |
| configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments)); |
| } |
| |
| /** |
| * Set `CARBON_INPUT_SEGMENTS` from property to configuration |
| */ |
| public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) { |
| String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase(); |
| String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase(); |
| String segmentNumbersFromProperty = CarbonProperties.getInstance() |
| .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*"); |
| if (!segmentNumbersFromProperty.trim().equals("*")) { |
| CarbonInputFormat.setSegmentsToAccess(conf, |
| Segment.toSegmentList(segmentNumbersFromProperty.split(","), null)); |
| } |
| } |
| |
| /** |
| * Set `CARBON_INPUT_SEGMENTS` from property to configuration |
| */ |
| public static void setQuerySegment(Configuration conf, String segmentList) { |
| if (!segmentList.trim().equals("*")) { |
| CarbonInputFormat |
| .setSegmentsToAccess(conf, Segment.toSegmentList(segmentList.split(","), null)); |
| } |
| } |
| |
| /** |
| * set list of segment to access |
| */ |
| public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) { |
| configuration.set(CarbonInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString()); |
| } |
| |
| /** |
| * get list of segment to access |
| */ |
| public static boolean getValidateSegmentsToAccess(Configuration configuration) { |
| return configuration.get(CarbonInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true") |
| .equalsIgnoreCase("true"); |
| } |
| |
| /** |
| * set list of partitions to prune |
| */ |
| public static void setPartitionsToPrune(Configuration configuration, |
| List<PartitionSpec> partitions) { |
| if (partitions == null) { |
| return; |
| } |
| try { |
| String partitionString = |
| ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions)); |
| configuration.set(PARTITIONS_TO_PRUNE, partitionString); |
| } catch (Exception e) { |
| throw new RuntimeException("Error while setting patition information to Job" + partitions, e); |
| } |
| } |
| |
| /** |
| * get list of partitions to prune |
| */ |
| public static List<PartitionSpec> getPartitionsToPrune(Configuration configuration) |
| throws IOException { |
| String partitionString = configuration.get(PARTITIONS_TO_PRUNE); |
| if (partitionString != null) { |
| return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString); |
| } |
| return null; |
| } |
| |
| public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) |
| throws IOException { |
| String tablePath = configuration.get(INPUT_DIR, ""); |
| try { |
| return AbsoluteTableIdentifier |
| .from(tablePath, getDatabaseName(configuration), getTableName(configuration)); |
| } catch (InvalidConfigurationException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| * Configurations FileInputFormat.INPUT_DIR |
| * are used to get table path to read. |
| * |
| * @param job |
| * @return List<InputSplit> list of CarbonInputSplit |
| * @throws IOException |
| */ |
| @Override public abstract List<InputSplit> getSplits(JobContext job) throws IOException; |
| |
| protected Expression getFilterPredicates(Configuration configuration) { |
| try { |
| String filterExprString = configuration.get(FILTER_PREDICATE); |
| if (filterExprString == null) { |
| return null; |
| } |
| Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString); |
| return (Expression) filter; |
| } catch (IOException e) { |
| throw new RuntimeException("Error while reading filter expression", e); |
| } |
| } |
| |
| /** |
| * get data blocks of given segment |
| */ |
| protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job, CarbonTable carbonTable, |
| FilterResolverIntf resolver, BitSet matchedPartitions, List<Segment> segmentIds, |
| PartitionInfo partitionInfo, List<Integer> oldPartitionIdList) throws IOException { |
| |
| QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder(); |
| QueryStatistic statistic = new QueryStatistic(); |
| |
| // get tokens for all the required FileSystem for table path |
| TokenCache.obtainTokensForNamenodes(job.getCredentials(), |
| new Path[] { new Path(carbonTable.getTablePath()) }, job.getConfiguration()); |
| List<ExtendedBlocklet> prunedBlocklets = |
| getPrunedBlocklets(job, carbonTable, resolver, segmentIds); |
| |
| List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>(); |
| int partitionIndex = 0; |
| List<Integer> partitionIdList = new ArrayList<>(); |
| if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) { |
| partitionIdList = partitionInfo.getPartitionIds(); |
| } |
| for (ExtendedBlocklet blocklet : prunedBlocklets) { |
| long partitionId = CarbonTablePath.DataFileUtil |
| .getTaskIdFromTaskNo(CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath())); |
| |
| // OldPartitionIdList is only used in alter table partition command because it change |
| // partition info first and then read data. |
| // For other normal query should use newest partitionIdList |
| if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) { |
| if (oldPartitionIdList != null) { |
| partitionIndex = oldPartitionIdList.indexOf((int) partitionId); |
| } else { |
| partitionIndex = partitionIdList.indexOf((int) partitionId); |
| } |
| } |
| if (partitionIndex != -1) { |
| // matchedPartitions variable will be null in two cases as follows |
| // 1. the table is not a partition table |
| // 2. the table is a partition table, and all partitions are matched by query |
| // for partition table, the task id of carbaondata file name is the partition id. |
| // if this partition is not required, here will skip it. |
| if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) { |
| CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet); |
| if (inputSplit != null) { |
| resultFilterredBlocks.add(inputSplit); |
| } |
| } |
| } |
| } |
| statistic |
| .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis()); |
| recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id")); |
| return resultFilterredBlocks; |
| } |
| |
| /** |
| * Prune the blocklets using the filter expression with available datamaps. |
| */ |
| private List<ExtendedBlocklet> getPrunedBlocklets(JobContext job, CarbonTable carbonTable, |
| FilterResolverIntf resolver, List<Segment> segmentIds) throws IOException { |
| boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance() |
| .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, |
| CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT)); |
| DataMapJob dataMapJob = getDataMapJob(job.getConfiguration()); |
| List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration()); |
| // First prune using default datamap on driver side. |
| DataMapExprWrapper dataMapExprWrapper = DataMapChooser.get() |
| .getDefaultDataMap(getOrCreateCarbonTable(job.getConfiguration()), resolver); |
| List<ExtendedBlocklet> prunedBlocklets = |
| dataMapExprWrapper.prune(segmentIds, partitionsToPrune); |
| // Get the available CG datamaps and prune further. |
| DataMapExprWrapper cgDataMapExprWrapper = DataMapChooser.get() |
| .chooseCGDataMap(getOrCreateCarbonTable(job.getConfiguration()), resolver); |
| if (cgDataMapExprWrapper != null) { |
| // Prune segments from already pruned blocklets |
| pruneSegments(segmentIds, prunedBlocklets); |
| // Again prune with CG datamap. |
| if (distributedCG && dataMapJob != null) { |
| prunedBlocklets = |
| executeDataMapJob(carbonTable, resolver, segmentIds, cgDataMapExprWrapper, dataMapJob, |
| partitionsToPrune); |
| } else { |
| prunedBlocklets = cgDataMapExprWrapper.prune(segmentIds, partitionsToPrune); |
| } |
| } |
| // Now try to prune with FG DataMap. |
| dataMapExprWrapper = DataMapChooser.get() |
| .chooseFGDataMap(getOrCreateCarbonTable(job.getConfiguration()), resolver); |
| if (dataMapExprWrapper != null && dataMapExprWrapper.getDataMapType() == DataMapLevel.FG |
| && isFgDataMapPruningEnable(job.getConfiguration()) && dataMapJob != null) { |
| // Prune segments from already pruned blocklets |
| pruneSegments(segmentIds, prunedBlocklets); |
| prunedBlocklets = |
| executeDataMapJob(carbonTable, resolver, segmentIds, dataMapExprWrapper, dataMapJob, |
| partitionsToPrune); |
| } |
| return prunedBlocklets; |
| } |
| |
| private List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable, |
| FilterResolverIntf resolver, List<Segment> segmentIds, DataMapExprWrapper dataMapExprWrapper, |
| DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune) throws IOException { |
| DistributableDataMapFormat datamapDstr = |
| new DistributableDataMapFormat(carbonTable, dataMapExprWrapper, segmentIds, |
| partitionsToPrune, BlockletDataMapFactory.class.getName()); |
| List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(datamapDstr, resolver); |
| // Apply expression on the blocklets. |
| prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets); |
| return prunedBlocklets; |
| } |
| |
| /** |
| * Prune the segments from the already pruned blocklets. |
| * @param segments |
| * @param prunedBlocklets |
| */ |
| private void pruneSegments(List<Segment> segments, List<ExtendedBlocklet> prunedBlocklets) { |
| List<Segment> toBeRemovedSegments = new ArrayList<>(); |
| for (Segment segment : segments) { |
| boolean found = false; |
| // Clear the old pruned index files if any present |
| segment.getFilteredIndexShardNames().clear(); |
| // Check the segment exist in any of the pruned blocklets. |
| for (ExtendedBlocklet blocklet : prunedBlocklets) { |
| if (blocklet.getSegmentId().equals(segment.getSegmentNo())) { |
| found = true; |
| // Set the pruned index file to the segment for further pruning. |
| String uniqueTaskName = CarbonTablePath.getUniqueTaskName(blocklet.getTaskName()); |
| segment.setFilteredIndexShardName(uniqueTaskName); |
| } |
| } |
| // Add to remove segments list if not present in pruned blocklets. |
| if (!found) { |
| toBeRemovedSegments.add(segment); |
| } |
| } |
| // Remove all segments which are already pruned from pruned blocklets |
| segments.removeAll(toBeRemovedSegments); |
| } |
| |
| private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException { |
| CarbonInputSplit split = CarbonInputSplit |
| .from(blocklet.getSegmentId(), blocklet.getBlockletId(), |
| new FileSplit(new Path(blocklet.getPath()), 0, blocklet.getLength(), |
| blocklet.getLocations()), |
| ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()), |
| blocklet.getDataMapWriterPath()); |
| split.setDetailInfo(blocklet.getDetailInfo()); |
| return split; |
| } |
| |
| @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, |
| TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { |
| Configuration configuration = taskAttemptContext.getConfiguration(); |
| QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext); |
| CarbonReadSupport<T> readSupport = getReadSupportClass(configuration); |
| return new CarbonRecordReader<T>(queryModel, readSupport); |
| } |
| |
| public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) |
| throws IOException { |
| Configuration configuration = taskAttemptContext.getConfiguration(); |
| CarbonTable carbonTable = getOrCreateCarbonTable(configuration); |
| |
| // set projection column in the query model |
| String projectionString = getColumnProjection(configuration); |
| String[] projectColumns; |
| if (projectionString != null) { |
| projectColumns = projectionString.split(","); |
| } else { |
| projectColumns = new String[]{}; |
| } |
| QueryModel queryModel = new QueryModelBuilder(carbonTable) |
| .projectColumns(projectColumns) |
| .filterExpression(getFilterPredicates(configuration)) |
| .dataConverter(getDataTypeConverter(configuration)) |
| .build(); |
| |
| // update the file level index store if there are invalid segment |
| if (inputSplit instanceof CarbonMultiBlockSplit) { |
| CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit; |
| List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments(); |
| if (invalidSegments.size() > 0) { |
| queryModel.setInvalidSegmentIds(invalidSegments); |
| } |
| List<UpdateVO> invalidTimestampRangeList = |
| split.getAllSplits().get(0).getInvalidTimestampRange(); |
| if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) { |
| queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList); |
| } |
| } |
| return queryModel; |
| } |
| |
| public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) { |
| String readSupportClass = configuration.get(CARBON_READ_SUPPORT); |
| //By default it uses dictionary decoder read class |
| CarbonReadSupport<T> readSupport = null; |
| if (readSupportClass != null) { |
| try { |
| Class<?> myClass = Class.forName(readSupportClass); |
| Constructor<?> constructor = myClass.getConstructors()[0]; |
| Object object = constructor.newInstance(); |
| if (object instanceof CarbonReadSupport) { |
| readSupport = (CarbonReadSupport) object; |
| } |
| } catch (ClassNotFoundException ex) { |
| LOG.error("Class " + readSupportClass + "not found", ex); |
| } catch (Exception ex) { |
| LOG.error("Error while creating " + readSupportClass, ex); |
| } |
| } else { |
| readSupport = new DictionaryDecodeReadSupport<>(); |
| } |
| return readSupport; |
| } |
| |
| @Override protected boolean isSplitable(JobContext context, Path filename) { |
| try { |
| // Don't split the file if it is local file system |
| FileSystem fileSystem = filename.getFileSystem(context.getConfiguration()); |
| if (fileSystem instanceof LocalFileSystem) { |
| return false; |
| } |
| } catch (Exception e) { |
| return true; |
| } |
| return true; |
| } |
| |
| public static void setCarbonReadSupport(Configuration configuration, |
| Class<? extends CarbonReadSupport> readSupportClass) { |
| if (readSupportClass != null) { |
| configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName()); |
| } |
| } |
| |
| /** |
| * It is optional, if user does not set then it reads from store |
| * |
| * @param configuration |
| * @param converterClass is the Data type converter for different computing engine |
| */ |
| public static void setDataTypeConverter( |
| Configuration configuration, Class<? extends DataTypeConverter> converterClass) { |
| if (null != converterClass) { |
| configuration.set(CARBON_CONVERTER, converterClass.getCanonicalName()); |
| } |
| } |
| |
| public static DataTypeConverter getDataTypeConverter(Configuration configuration) |
| throws IOException { |
| String converterClass = configuration.get(CARBON_CONVERTER); |
| if (converterClass == null) { |
| return new DataTypeConverterImpl(); |
| } |
| |
| try { |
| return (DataTypeConverter) Class.forName(converterClass).newInstance(); |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| } |
| |
| public static void setDatabaseName(Configuration configuration, String databaseName) { |
| if (null != databaseName) { |
| configuration.set(DATABASE_NAME, databaseName); |
| } |
| } |
| |
| public static String getDatabaseName(Configuration configuration) |
| throws InvalidConfigurationException { |
| String databseName = configuration.get(DATABASE_NAME); |
| if (null == databseName) { |
| throw new InvalidConfigurationException("Database name is not set."); |
| } |
| return databseName; |
| } |
| |
| public static void setTableName(Configuration configuration, String tableName) { |
| if (null != tableName) { |
| configuration.set(TABLE_NAME, tableName); |
| } |
| } |
| |
| public static String getTableName(Configuration configuration) |
| throws InvalidConfigurationException { |
| String tableName = configuration.get(TABLE_NAME); |
| if (tableName == null) { |
| throw new InvalidConfigurationException("Table name is not set"); |
| } |
| return tableName; |
| } |
| |
| public static void setAccessStreamingSegments(Configuration configuration, Boolean validate) |
| throws InvalidConfigurationException { |
| configuration.set( |
| CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING + "." + getDatabaseName( |
| configuration) + "." + getTableName(configuration), validate.toString()); |
| } |
| |
| public static boolean getAccessStreamingSegments(Configuration configuration) { |
| try { |
| return configuration.get( |
| CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING + "." + getDatabaseName( |
| configuration) + "." + getTableName( |
| configuration), "false").equalsIgnoreCase("true"); |
| |
| } catch (InvalidConfigurationException e) { |
| return false; |
| } |
| } |
| } |