blob: c0366d24615a5a875938a38246ac8903b243226d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.hadoop.api;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
import org.apache.carbondata.core.metadata.schema.SchemaReader;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.SingleTableProvider;
import org.apache.carbondata.core.scan.filter.TableProvider;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
/**
* InputFormat for reading carbondata files without table level metadata support,
* schema is inferred as following steps:
* 1. read from schema file is exists
* 2. read from data file footer
*
* @param <T>
*/
@InterfaceAudience.User
@InterfaceStability.Evolving
public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Serializable {
// a cache for carbon table, it will be used in task side
private CarbonTable carbonTable;
protected CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
CarbonTable carbonTableTemp;
if (carbonTable == null) {
// carbon table should be created either from deserialized table info (schema saved in
// hive metastore) or by reading schema in HDFS (schema saved in HDFS)
TableInfo tableInfo = getTableInfo(configuration);
CarbonTable localCarbonTable;
if (tableInfo != null) {
localCarbonTable = CarbonTable.buildFromTableInfo(tableInfo);
} else {
String schemaPath = CarbonTablePath
.getSchemaFilePath(getAbsoluteTableIdentifier(configuration).getTablePath());
if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
TableInfo tableInfoInfer =
SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration));
localCarbonTable = CarbonTable.buildFromTableInfo(tableInfoInfer);
} else {
localCarbonTable =
SchemaReader.readCarbonTableFromStore(getAbsoluteTableIdentifier(configuration));
}
}
this.carbonTable = localCarbonTable;
return localCarbonTable;
} else {
carbonTableTemp = this.carbonTable;
return carbonTableTemp;
}
}
/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR
* are used to get table path to read.
*
* @param job
* @return List<InputSplit> list of CarbonInputSplit
* @throws IOException
*/
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
if (null == carbonTable) {
throw new IOException("Missing/Corrupt schema file for table.");
}
if (getValidateSegmentsToAccess(job.getConfiguration())) {
// get all valid segments and set them into the configuration
// check for externalTable segment (Segment_null)
// process and resolve the expression
Expression filter = getFilterPredicates(job.getConfiguration());
TableProvider tableProvider = new SingleTableProvider(carbonTable);
// this will be null in case of corrupt schema file.
PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
carbonTable.processFilterExpression(filter, null, null);
FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter, tableProvider);
String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null");
FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
if (FileFactory.isFileExist(segmentDir, fileType)) {
// if external table Segments are found, add it to the List
List<Segment> externalTableSegments = new ArrayList<Segment>();
Segment seg = new Segment("null", null);
externalTableSegments.add(seg);
Map<String, String> indexFiles =
new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir);
if (indexFiles.size() == 0) {
throw new RuntimeException("Index file not present to read the carbondata file");
}
// do block filtering and get split
List<InputSplit> splits =
getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null);
return splits;
}
}
return null;
}
/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS
* are used to get table path to read.
*
* @return
* @throws IOException
*/
private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo,
List<Integer> oldPartitionIdList) throws IOException {
List<InputSplit> result = new LinkedList<InputSplit>();
UpdateVO invalidBlockVOForSegmentId = null;
Boolean isIUDTable = false;
AbsoluteTableIdentifier absoluteTableIdentifier =
getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
SegmentUpdateStatusManager updateStatusManager =
new SegmentUpdateStatusManager(absoluteTableIdentifier);
isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
// for each segment fetch blocks matching filter in Driver BTree
List<CarbonInputSplit> dataBlocksOfSegment =
getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions,
validSegments, partitionInfo, oldPartitionIdList);
for (CarbonInputSplit inputSplit : dataBlocksOfSegment) {
// Get the UpdateVO for those tables on which IUD operations being performed.
if (isIUDTable) {
invalidBlockVOForSegmentId =
updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId());
}
String[] deleteDeltaFilePath = null;
if (isIUDTable) {
// In case IUD is not performed in this table avoid searching for
// invalidated blocks.
if (CarbonUtil
.isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(),
invalidBlockVOForSegmentId, updateStatusManager)) {
continue;
}
// When iud is done then only get delete delta files for a block
try {
deleteDeltaFilePath = updateStatusManager
.getDeleteDeltaFilePath(inputSplit.getPath().toString(), inputSplit.getSegmentId());
} catch (Exception e) {
throw new IOException(e);
}
}
inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath);
result.add(inputSplit);
}
return result;
}
}