blob: 273536a4cbf9248f67ba192f68a9953aa57f0cf1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.hive;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.util.DataTypeConverterImpl;
import org.apache.carbondata.hadoop.CarbonInputFormat;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
import org.apache.carbondata.hadoop.util.SchemaReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
implements InputFormat<Void, ArrayWritable>, CombineHiveInputFormat.AvoidSplitCombination {
private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
/**
* this method will read the schema from the physical file and populate into CARBON_TABLE
*
* @param configuration
* @throws IOException
*/
private static void populateCarbonTable(Configuration configuration, String paths)
throws IOException {
String dirs = configuration.get(INPUT_DIR, "");
String[] inputPaths = StringUtils.split(dirs);
String validInputPath = null;
if (inputPaths.length == 0) {
throw new InvalidPathException("No input paths specified in job");
} else {
if (paths != null) {
for (String inputPath : inputPaths) {
if (paths.startsWith(inputPath.replace("file:", ""))) {
validInputPath = inputPath;
break;
}
}
}
}
AbsoluteTableIdentifier absoluteTableIdentifier =
AbsoluteTableIdentifier.fromTablePath(validInputPath);
// read the schema file to get the absoluteTableIdentifier having the correct table id
// persisted in the schema
CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable));
setTableInfo(configuration, carbonTable.getTableInfo());
}
private static CarbonTable getCarbonTable(Configuration configuration, String path)
throws IOException {
populateCarbonTable(configuration, path);
// read it from schema file in the store
String carbonTableStr = configuration.get(CARBON_TABLE);
return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
}
@Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf);
List<org.apache.hadoop.mapreduce.InputSplit> splitList = super.getSplits(jobContext);
InputSplit[] splits = new InputSplit[splitList.size()];
CarbonInputSplit split;
for (int i = 0; i < splitList.size(); i++) {
split = (CarbonInputSplit) splitList.get(i);
splits[i] = new CarbonHiveInputSplit(split.getSegmentId(), split.getPath(), split.getStart(),
split.getLength(), split.getLocations(), split.getNumberOfBlocklets(), split.getVersion(),
split.getBlockStorageIdMap());
}
return splits;
}
@Override
public RecordReader<Void, ArrayWritable> getRecordReader(InputSplit inputSplit, JobConf jobConf,
Reporter reporter) throws IOException {
String path = null;
if (inputSplit instanceof CarbonHiveInputSplit) {
path = ((CarbonHiveInputSplit) inputSplit).getPath().toString();
}
QueryModel queryModel = getQueryModel(jobConf, path);
CarbonReadSupport<ArrayWritable> readSupport = new CarbonDictionaryDecodeReadSupport<>();
return new CarbonHiveRecordReader(queryModel, readSupport, inputSplit, jobConf);
}
private QueryModel getQueryModel(Configuration configuration, String path) throws IOException {
CarbonTable carbonTable = getCarbonTable(configuration, path);
// getting the table absoluteTableIdentifier from the carbonTable
// to avoid unnecessary deserialization
StringBuilder colNames = new StringBuilder();
AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
String projection = getProjection(configuration, carbonTable,
identifier.getCarbonTableIdentifier().getTableName());
CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
QueryModel queryModel =
QueryModel.createModel(identifier, queryPlan, carbonTable, new DataTypeConverterImpl());
// set the filter to the query model in order to filter blocklet before scan
Expression filter = getFilterPredicates(configuration);
CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier);
queryModel.setFilterExpressionResolverTree(filterIntf);
return queryModel;
}
/**
* Return the Projection for the CarbonQuery.
*
* @param configuration
* @param carbonTable
* @param tableName
* @return
*/
private String getProjection(Configuration configuration, CarbonTable carbonTable,
String tableName) {
// query plan includes projection column
String projection = getColumnProjection(configuration);
if (projection == null) {
projection = configuration.get("hive.io.file.readcolumn.names");
}
List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(tableName);
List<String> carbonColumnNames = new ArrayList<>();
StringBuilder allColumns = new StringBuilder();
StringBuilder projectionColumns = new StringBuilder();
for (CarbonColumn column : carbonColumns) {
carbonColumnNames.add(column.getColName());
allColumns.append(column.getColName() + ",");
}
if (!projection.equals("")) {
String[] columnNames = projection.split(",");
//verify that the columns parsed by Hive exist in the table
for (String col : columnNames) {
//show columns command will return these data
if (carbonColumnNames.contains(col)) {
projectionColumns.append(col + ",");
}
}
return projectionColumns.substring(0, projectionColumns.lastIndexOf(","));
} else {
return allColumns.substring(0, allColumns.lastIndexOf(","));
}
}
@Override public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException {
return true;
}
}