blob: 6ae346f89a778de78241eb93603daea490f78005 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.hadoop.api;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.mutate.data.BlockMappingVO;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeConverter;
import org.apache.carbondata.core.util.DataTypeConverterImpl;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
import org.apache.carbondata.hadoop.CarbonProjection;
import org.apache.carbondata.hadoop.CarbonRecordReader;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
import org.apache.carbondata.hadoop.util.SchemaReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.StringUtils;
/**
* Input format of CarbonData file.
*
* @param <T>
*/
public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
// comma separated list of input segment numbers
public static final String INPUT_SEGMENT_NUMBERS =
"mapreduce.input.carboninputformat.segmentnumbers";
// comma separated list of input files
public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
public static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class);
private static final String FILTER_PREDICATE =
"mapreduce.input.carboninputformat.filter.predicate";
private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
// a cache for carbon table, it will be used in task side
private CarbonTable carbonTable;
/**
* Set the `tableInfo` in `configuration`
*/
public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
throws IOException {
if (null != tableInfo) {
configuration.set(TABLE_INFO, ObjectSerializationUtil.encodeToString(tableInfo.serialize()));
}
}
/**
* Get TableInfo object from `configuration`
*/
private TableInfo getTableInfo(Configuration configuration) throws IOException {
String tableInfoStr = configuration.get(TABLE_INFO);
if (tableInfoStr == null) {
return null;
} else {
TableInfo output = new TableInfo();
output.readFields(
new DataInputStream(
new ByteArrayInputStream(ObjectSerializationUtil.decodeStringToBytes(tableInfoStr))));
return output;
}
}
/**
* Get the cached CarbonTable or create it by TableInfo in `configuration`
*/
private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
if (carbonTable == null) {
// carbon table should be created either from deserialized table info (schema saved in
// hive metastore) or by reading schema in HDFS (schema saved in HDFS)
TableInfo tableInfo = getTableInfo(configuration);
CarbonTable carbonTable;
if (tableInfo != null) {
carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
} else {
carbonTable = SchemaReader.readCarbonTableFromStore(
getAbsoluteTableIdentifier(configuration));
}
this.carbonTable = carbonTable;
return carbonTable;
} else {
return this.carbonTable;
}
}
public static void setTablePath(Configuration configuration, String tablePath)
throws IOException {
configuration.set(FileInputFormat.INPUT_DIR, tablePath);
}
public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
}
/**
* It sets unresolved filter expression.
*
* @param configuration
* @param filterExpression
*/
public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
if (filterExpression == null) {
return;
}
try {
String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
configuration.set(FILTER_PREDICATE, filterString);
} catch (Exception e) {
throw new RuntimeException("Error while setting filter expression to Job", e);
}
}
public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
if (projection == null || projection.isEmpty()) {
return;
}
String[] allColumns = projection.getAllColumns();
StringBuilder builder = new StringBuilder();
for (String column : allColumns) {
builder.append(column).append(",");
}
String columnString = builder.toString();
columnString = columnString.substring(0, columnString.length() - 1);
configuration.set(COLUMN_PROJECTION, columnString);
}
public static String getColumnProjection(Configuration configuration) {
return configuration.get(COLUMN_PROJECTION);
}
public static void setCarbonReadSupport(Configuration configuration,
Class<? extends CarbonReadSupport> readSupportClass) {
if (readSupportClass != null) {
configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
}
}
private static CarbonTablePath getTablePath(AbsoluteTableIdentifier absIdentifier) {
return CarbonStorePath.getCarbonTablePath(absIdentifier);
}
/**
* Set list of segments to access
*/
public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
}
/**
* Set list of files to access
*/
public static void setFilesToAccess(Configuration configuration, List<String> validFiles) {
configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
}
private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
throws IOException {
String dirs = configuration.get(INPUT_DIR, "");
String[] inputPaths = StringUtils.split(dirs);
if (inputPaths.length == 0) {
throw new InvalidPathException("No input paths specified in job");
}
return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
}
/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR
* are used to get table path to read.
*
* @param job
* @return List<InputSplit> list of CarbonInputSplit
* @throws IOException
*/
@Override public List<InputSplit> getSplits(JobContext job) throws IOException {
AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
TableDataMap blockletMap =
DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME,
BlockletDataMapFactory.class);
List<String> invalidSegments = new ArrayList<>();
List<UpdateVO> invalidTimestampsList = new ArrayList<>();
List<String> validSegments = Arrays.asList(getSegmentsToAccess(job));
// get all valid segments and set them into the configuration
if (validSegments.size() == 0) {
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
segmentStatusManager.getValidAndInvalidSegments();
SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
validSegments = segments.getValidSegments();
if (validSegments.size() == 0) {
return new ArrayList<>(0);
}
// remove entry in the segment index if there are invalid segments
invalidSegments.addAll(segments.getInvalidSegments());
for (String invalidSegmentId : invalidSegments) {
invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
}
if (invalidSegments.size() > 0) {
blockletMap.clear(invalidSegments);
}
}
// process and resolve the expression
Expression filter = getFilterPredicates(job.getConfiguration());
CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
// this will be null in case of corrupt schema file.
if (null == carbonTable) {
throw new IOException("Missing/Corrupt schema file for table.");
}
PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
// prune partitions for filter query on partition table
BitSet matchedPartitions = null;
if (partitionInfo != null) {
matchedPartitions = setMatchedPartitions(null, filter, partitionInfo);
if (matchedPartitions != null) {
if (matchedPartitions.cardinality() == 0) {
return new ArrayList<InputSplit>();
} else if (matchedPartitions.cardinality() == partitionInfo.getNumPartitions()) {
matchedPartitions = null;
}
}
}
FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier);
// do block filtering and get split
List<InputSplit> splits =
getSplits(job, filterInterface, validSegments, matchedPartitions, partitionInfo, null);
// pass the invalid segment to task side in order to remove index entry in task side
if (invalidSegments.size() > 0) {
for (InputSplit split : splits) {
((org.apache.carbondata.hadoop.CarbonInputSplit) split).setInvalidSegments(invalidSegments);
((org.apache.carbondata.hadoop.CarbonInputSplit) split)
.setInvalidTimestampRange(invalidTimestampsList);
}
}
return splits;
}
/**
* Read data in one segment. For alter table partition statement
* @param job
* @param targetSegment
* @param oldPartitionIdList get old partitionId before partitionInfo was changed
* @return
* @throws IOException
*/
public List<InputSplit> getSplitsOfOneSegment(JobContext job, String targetSegment,
List<Integer> oldPartitionIdList, PartitionInfo partitionInfo)
throws IOException {
AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
List<String> invalidSegments = new ArrayList<>();
List<UpdateVO> invalidTimestampsList = new ArrayList<>();
List<String> segmentList = new ArrayList<>();
segmentList.add(targetSegment);
setSegmentsToAccess(job.getConfiguration(), segmentList);
try {
// process and resolve the expression
Expression filter = getFilterPredicates(job.getConfiguration());
CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
// this will be null in case of corrupt schema file.
if (null == carbonTable) {
throw new IOException("Missing/Corrupt schema file for table.");
}
CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
// prune partitions for filter query on partition table
String partitionIds = job.getConfiguration().get(ALTER_PARTITION_ID);
BitSet matchedPartitions = null;
if (partitionInfo != null) {
matchedPartitions = setMatchedPartitions(partitionIds, filter, partitionInfo);
if (matchedPartitions != null) {
if (matchedPartitions.cardinality() == 0) {
return new ArrayList<InputSplit>();
} else if (matchedPartitions.cardinality() == partitionInfo.getNumPartitions()) {
matchedPartitions = null;
}
}
}
FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier);
// do block filtering and get split
List<InputSplit> splits = getSplits(job, filterInterface, segmentList, matchedPartitions,
partitionInfo, oldPartitionIdList);
// pass the invalid segment to task side in order to remove index entry in task side
if (invalidSegments.size() > 0) {
for (InputSplit split : splits) {
((CarbonInputSplit) split).setInvalidSegments(invalidSegments);
((CarbonInputSplit) split).setInvalidTimestampRange(invalidTimestampsList);
}
}
return splits;
} catch (IOException e) {
throw new RuntimeException("Can't get splits of the target segment ", e);
}
}
private BitSet setMatchedPartitions(String partitionIds, Expression filter,
PartitionInfo partitionInfo) {
BitSet matchedPartitions = null;
if (null != partitionIds) {
String[] partList = partitionIds.replace("[", "").replace("]", "").split(",");
// only one partitionId in current alter table statement
matchedPartitions = new BitSet(Integer.parseInt(partList[0]));
for (String partitionId : partList) {
matchedPartitions.set(Integer.parseInt(partitionId));
}
} else {
if (null != filter) {
matchedPartitions =
new FilterExpressionProcessor().getFilteredPartitions(filter, partitionInfo);
}
}
return matchedPartitions;
}
/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR, CarbonInputFormat.INPUT_SEGMENT_NUMBERS
* are used to get table path to read.
*
* @return
* @throws IOException
*/
private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
List<String> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo,
List<Integer> oldPartitionIdList) throws IOException {
List<InputSplit> result = new LinkedList<InputSplit>();
UpdateVO invalidBlockVOForSegmentId = null;
Boolean isIUDTable = false;
AbsoluteTableIdentifier absoluteTableIdentifier =
getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
SegmentUpdateStatusManager updateStatusManager =
new SegmentUpdateStatusManager(absoluteTableIdentifier);
isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
// for each segment fetch blocks matching filter in Driver BTree
List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions,
validSegments, partitionInfo, oldPartitionIdList);
for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) {
// Get the UpdateVO for those tables on which IUD operations being performed.
if (isIUDTable) {
invalidBlockVOForSegmentId =
updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId());
}
if (isIUDTable) {
// In case IUD is not performed in this table avoid searching for
// invalidated blocks.
if (CarbonUtil
.isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(),
invalidBlockVOForSegmentId, updateStatusManager)) {
continue;
}
}
String[] deleteDeltaFilePath = null;
try {
deleteDeltaFilePath =
updateStatusManager.getDeleteDeltaFilePath(inputSplit.getPath().toString());
} catch (Exception e) {
throw new IOException(e);
}
inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath);
result.add(inputSplit);
}
return result;
}
protected Expression getFilterPredicates(Configuration configuration) {
try {
String filterExprString = configuration.get(FILTER_PREDICATE);
if (filterExprString == null) {
return null;
}
Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
return (Expression) filter;
} catch (IOException e) {
throw new RuntimeException("Error while reading filter expression", e);
}
}
/**
* get data blocks of given segment
*/
private List<org.apache.carbondata.hadoop.CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
BitSet matchedPartitions, List<String> segmentIds, PartitionInfo partitionInfo,
List<Integer> oldPartitionIdList) throws IOException {
QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
QueryStatistic statistic = new QueryStatistic();
// get tokens for all the required FileSystem for table path
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
TableDataMap blockletMap = DataMapStoreManager.getInstance()
.getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME, BlockletDataMapFactory.class);
List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver);
List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
int partitionIndex = 0;
List<Integer> partitionIdList = new ArrayList<>();
if (partitionInfo != null) {
partitionIdList = partitionInfo.getPartitionIds();
}
for (Blocklet blocklet : prunedBlocklets) {
int partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath().toString()));
// OldPartitionIdList is only used in alter table partition command because it change
// partition info first and then read data.
// For other normal query should use newest partitionIdList
if (partitionInfo != null) {
if (oldPartitionIdList != null) {
partitionIndex = oldPartitionIdList.indexOf(partitionId);
} else {
partitionIndex = partitionIdList.indexOf(partitionId);
}
}
if (partitionIndex != -1) {
// matchedPartitions variable will be null in two cases as follows
// 1. the table is not a partition table
// 2. the table is a partition table, and all partitions are matched by query
// for partition table, the task id of carbaondata file name is the partition id.
// if this partition is not required, here will skip it.
if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) {
resultFilterredBlocks.add(convertToCarbonInputSplit(blocklet));
}
}
}
statistic
.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
return resultFilterredBlocks;
}
private org.apache.carbondata.hadoop.CarbonInputSplit convertToCarbonInputSplit(Blocklet blocklet)
throws IOException {
blocklet.updateLocations();
org.apache.carbondata.hadoop.CarbonInputSplit split =
org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(),
new FileSplit(blocklet.getPath(), 0, blocklet.getLength(), blocklet.getLocations()),
ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()));
split.setDetailInfo(blocklet.getDetailInfo());
return split;
}
@Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
Configuration configuration = taskAttemptContext.getConfiguration();
QueryModel queryModel = getQueryModel(inputSplit, taskAttemptContext);
CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
return new CarbonRecordReader<T>(queryModel, readSupport);
}
public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException {
Configuration configuration = taskAttemptContext.getConfiguration();
CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
// getting the table absoluteTableIdentifier from the carbonTable
// to avoid unnecessary deserialization
AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
// query plan includes projection column
String projection = getColumnProjection(configuration);
CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable,
getDataTypeConverter(configuration));
// set the filter to the query model in order to filter blocklet before scan
Expression filter = getFilterPredicates(configuration);
CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier);
queryModel.setFilterExpressionResolverTree(filterIntf);
// update the file level index store if there are invalid segment
if (inputSplit instanceof CarbonMultiBlockSplit) {
CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
if (invalidSegments.size() > 0) {
queryModel.setInvalidSegmentIds(invalidSegments);
}
List<UpdateVO> invalidTimestampRangeList =
split.getAllSplits().get(0).getInvalidTimestampRange();
if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
}
}
return queryModel;
}
public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
//By default it uses dictionary decoder read class
CarbonReadSupport<T> readSupport = null;
if (readSupportClass != null) {
try {
Class<?> myClass = Class.forName(readSupportClass);
Constructor<?> constructor = myClass.getConstructors()[0];
Object object = constructor.newInstance();
if (object instanceof CarbonReadSupport) {
readSupport = (CarbonReadSupport) object;
}
} catch (ClassNotFoundException ex) {
LOG.error("Class " + readSupportClass + "not found", ex);
} catch (Exception ex) {
LOG.error("Error while creating " + readSupportClass, ex);
}
} else {
readSupport = new DictionaryDecodeReadSupport<>();
}
return readSupport;
}
@Override protected boolean isSplitable(JobContext context, Path filename) {
try {
// Don't split the file if it is local file system
FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
if (fileSystem instanceof LocalFileSystem) {
return false;
}
} catch (Exception e) {
return true;
}
return true;
}
/**
* required to be moved to core
*
* @return updateExtension
*/
private String getUpdateExtension() {
// TODO: required to modify when supporting update, mostly will be update timestamp
return "update";
}
/**
* return valid segment to access
*/
private String[] getSegmentsToAccess(JobContext job) {
String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
if (segmentString.trim().isEmpty()) {
return new String[0];
}
return segmentString.split(",");
}
/**
* Get the row count of the Block and mapping of segment and Block count.
*
* @param job
* @param identifier
* @return
* @throws IOException
* @throws KeyGenException
*/
public BlockMappingVO getBlockRowCount(JobContext job, AbsoluteTableIdentifier identifier)
throws IOException, KeyGenException {
TableDataMap blockletMap = DataMapStoreManager.getInstance()
.getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class);
SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
new SegmentStatusManager(identifier).getValidAndInvalidSegments();
Map<String, Long> blockRowCountMapping =
new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
Map<String, Long> segmentAndBlockCountMapping =
new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<Blocklet> blocklets = blockletMap.prune(validAndInvalidSegments.getValidSegments(), null);
for (Blocklet blocklet : blocklets) {
String blockName = blocklet.getPath().toString();
blockName = CarbonTablePath.getCarbonDataFileName(blockName);
blockName = blockName + CarbonTablePath.getCarbonDataExtension();
long rowCount = blocklet.getDetailInfo().getRowCount();
String key = CarbonUpdateUtil.getSegmentBlockNameKey(blocklet.getSegmentId(), blockName);
// if block is invalid then dont add the count
SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) {
Long blockCount = blockRowCountMapping.get(key);
if (blockCount == null) {
blockCount = 0L;
Long count = segmentAndBlockCountMapping.get(blocklet.getSegmentId());
if (count == null) {
count = 0L;
}
segmentAndBlockCountMapping.put(blocklet.getSegmentId(), count + 1);
}
blockCount += rowCount;
blockRowCountMapping.put(key, blockCount);
}
}
return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
}
/**
* It is optional, if user does not set then it reads from store
*
* @param configuration
* @param converter is the Data type converter for different computing engine
* @throws IOException
*/
public static void setDataTypeConverter(Configuration configuration, DataTypeConverter converter)
throws IOException {
if (null != converter) {
configuration.set(CARBON_CONVERTER,
ObjectSerializationUtil.convertObjectToString(converter));
}
}
public static DataTypeConverter getDataTypeConverter(Configuration configuration)
throws IOException {
String converter = configuration.get(CARBON_CONVERTER);
if (converter == null) {
return new DataTypeConverterImpl();
}
return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter);
}
}