blob: 6ab91cb82ab8ca5e67070b9a5ae57c4d96d232a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.hadoop.api;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.SchemaReader;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.statusmanager.FileFormat;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
/**
* InputFormat for reading carbondata files without table level metadata support,
* schema is inferred as following steps:
* 1. read from schema file is exists
* 2. read from data file footer
*
* @param <T>
*/
@InterfaceAudience.User
@InterfaceStability.Evolving
public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Serializable {
// a cache for carbon table, it will be used in task side
private CarbonTable carbonTable;
public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
CarbonTable carbonTableTemp;
if (carbonTable == null) {
// carbon table should be created either from deserialized table info (schema saved in
// hive metastore) or by reading schema in HDFS (schema saved in HDFS)
TableInfo tableInfo = getTableInfo(configuration);
CarbonTable localCarbonTable;
if (tableInfo != null) {
localCarbonTable = CarbonTable.buildFromTableInfo(tableInfo);
} else {
String schemaPath = CarbonTablePath
.getSchemaFilePath(getAbsoluteTableIdentifier(configuration).getTablePath());
if (!FileFactory.isFileExist(schemaPath)) {
TableInfo tableInfoInfer =
SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration), true);
localCarbonTable = CarbonTable.buildFromTableInfo(tableInfoInfer);
} else {
localCarbonTable =
SchemaReader.readCarbonTableFromStore(getAbsoluteTableIdentifier(configuration));
}
}
this.carbonTable = localCarbonTable;
return localCarbonTable;
} else {
carbonTableTemp = this.carbonTable;
return carbonTableTemp;
}
}
/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR
* are used to get table path to read.
*
* @param job
* @return List<InputSplit> list of CarbonInputSplit
* @throws IOException
*/
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
if (null == carbonTable) {
throw new IOException("Missing/Corrupt schema file for table.");
}
AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
// get all valid segments and set them into the configuration
// check for externalTable segment (Segment_null)
// process and resolve the expression
ReadCommittedScope readCommittedScope = null;
if (carbonTable.isTransactionalTable()) {
readCommittedScope = new LatestFilesReadCommittedScope(
identifier.getTablePath() + "/Fact/Part0/Segment_null/", job.getConfiguration());
} else {
readCommittedScope = getReadCommittedScope(job.getConfiguration());
if (readCommittedScope == null) {
readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath(), job
.getConfiguration());
} else {
readCommittedScope.setConfiguration(job.getConfiguration());
}
}
// this will be null in case of corrupt schema file.
DataMapFilter filter = getFilterPredicates(job.getConfiguration());
// if external table Segments are found, add it to the List
List<Segment> externalTableSegments = new ArrayList<Segment>();
Segment seg;
if (carbonTable.isTransactionalTable()) {
// SDK some cases write into the Segment Path instead of Table Path i.e. inside
// the "Fact/Part0/Segment_null". The segment in this case is named as "null".
// The table is denoted by default as a transactional table and goes through
// the path of CarbonFileInputFormat. The above scenario is handled in the below code.
seg = new Segment("null", null, readCommittedScope);
externalTableSegments.add(seg);
} else {
LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
for (LoadMetadataDetails load : loadMetadataDetails) {
seg = new Segment(load.getLoadName(), null, readCommittedScope);
if (fileLists != null) {
for (int i = 0; i < fileLists.size(); i++) {
if (fileLists.get(i).toString().endsWith(seg.getSegmentNo()
+ CarbonTablePath.CARBON_DATA_EXT)) {
externalTableSegments.add(seg);
break;
}
}
} else {
externalTableSegments.add(seg);
}
}
}
List<InputSplit> splits = new ArrayList<>();
boolean useBlockDataMap = job.getConfiguration().getBoolean("filter_blocks", true);
// useBlockDataMap would be false in case of SDK when user has not provided any filter, In
// this case we don't want to load block/blocklet datamap. It would be true in all other
// scenarios
if (filter != null) {
filter.resolve(false);
}
if (useBlockDataMap) {
// do block filtering and get split
splits = getSplits(job, filter, externalTableSegments);
} else {
List<CarbonFile> carbonFiles = null;
if (null != this.fileLists) {
carbonFiles = getAllCarbonDataFiles(this.fileLists);
} else {
carbonFiles = getAllCarbonDataFiles(carbonTable.getTablePath());
}
for (CarbonFile carbonFile : carbonFiles) {
// Segment id is set to null because SDK does not write carbondata files with respect
// to segments. So no specific name is present for this load.
CarbonInputSplit split =
new CarbonInputSplit("null", carbonFile.getAbsolutePath(), 0,
carbonFile.getLength(), carbonFile.getLocations(), FileFormat.COLUMNAR_V3);
split.setVersion(ColumnarFormatVersion.V3);
BlockletDetailInfo info = new BlockletDetailInfo();
split.setDetailInfo(info);
info.setBlockSize(carbonFile.getLength());
info.setVersionNumber(split.getVersion().number());
info.setUseMinMaxForPruning(false);
splits.add(split);
}
Collections.sort(splits, new Comparator<InputSplit>() {
@Override
public int compare(InputSplit o1, InputSplit o2) {
return ((CarbonInputSplit) o1).getFilePath()
.compareTo(((CarbonInputSplit) o2).getFilePath());
}
});
}
setAllColumnProjectionIfNotConfigured(job, carbonTable);
return splits;
}
public void setAllColumnProjectionIfNotConfigured(JobContext job, CarbonTable carbonTable) {
if (getColumnProjection(job.getConfiguration()) == null) {
// If the user projection is empty, use default all columns as projections.
// All column name will be filled inside getSplits, so can update only here.
String[] projectionColumns = projectAllColumns(carbonTable);
setColumnProjection(job.getConfiguration(), projectionColumns);
}
}
private List<CarbonFile> getAllCarbonDataFiles(String tablePath) {
List<CarbonFile> carbonFiles;
try {
carbonFiles = FileFactory.getCarbonFile(tablePath).listFiles(true, new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
return file.getName().endsWith(CarbonTablePath.CARBON_DATA_EXT);
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}
return carbonFiles;
}
private List<CarbonFile> getAllCarbonDataFiles(List fileLists) {
List<CarbonFile> carbonFiles = new LinkedList<CarbonFile>();
try {
for (int i = 0; i < fileLists.size(); i++) {
carbonFiles.add(FileFactory.getCarbonFile(fileLists.get(i).toString()));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return carbonFiles;
}
/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS
* are used to get table path to read.
*
* @return
* @throws IOException
*/
private List<InputSplit> getSplits(
JobContext job,
DataMapFilter dataMapFilter,
List<Segment> validSegments) throws IOException {
numSegments = validSegments.size();
List<InputSplit> result = new LinkedList<InputSplit>();
// for each segment fetch blocks matching filter in Driver BTree
List<CarbonInputSplit> dataBlocksOfSegment =
getDataBlocksOfSegment(job, carbonTable, dataMapFilter, validSegments,
new ArrayList<Segment>(), new ArrayList<String>());
numBlocks = dataBlocksOfSegment.size();
result.addAll(dataBlocksOfSegment);
return result;
}
}