blob: d78e44b1c4ebc771aa60b7718cf54340d8499242 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.hadoop.util;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Locale;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datamap.DataMapJob;
import org.apache.carbondata.core.datamap.DataMapUtil;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonSessionInfo;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.hadoop.CarbonProjection;
import org.apache.carbondata.hadoop.api.CarbonInputFormat;
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.log4j.Logger;
/**
* Utility class
*/
public class CarbonInputFormatUtil {
/**
* Attribute for Carbon LOGGER.
*/
private static final Logger LOGGER =
LogServiceFactory.getLogService(CarbonProperties.class.getName());
public static <V> CarbonTableInputFormat<V> createCarbonInputFormat(
AbsoluteTableIdentifier identifier,
Job job) throws IOException {
CarbonTableInputFormat<V> carbonInputFormat = new CarbonTableInputFormat<>();
CarbonTableInputFormat.setDatabaseName(
job.getConfiguration(), identifier.getCarbonTableIdentifier().getDatabaseName());
CarbonTableInputFormat.setTableName(
job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
setDataMapJobIfConfigured(job.getConfiguration());
return carbonInputFormat;
}
public static <V> CarbonTableInputFormat<V> createCarbonTableInputFormat(
AbsoluteTableIdentifier identifier, List<String> partitionId, Job job) throws IOException {
CarbonTableInputFormat<V> carbonTableInputFormat = new CarbonTableInputFormat<>();
CarbonTableInputFormat.setPartitionIdList(
job.getConfiguration(), partitionId);
CarbonTableInputFormat.setDatabaseName(
job.getConfiguration(), identifier.getCarbonTableIdentifier().getDatabaseName());
CarbonTableInputFormat.setTableName(
job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
setDataMapJobIfConfigured(job.getConfiguration());
return carbonTableInputFormat;
}
public static <V> CarbonTableInputFormat<V> createCarbonTableInputFormat(
Job job,
CarbonTable carbonTable,
String[] projectionColumns,
Expression filterExpression,
List<PartitionSpec> partitionNames,
DataMapJob dataMapJob) throws IOException, InvalidConfigurationException {
Configuration conf = job.getConfiguration();
CarbonInputFormat.setTableInfo(conf, carbonTable.getTableInfo());
CarbonInputFormat.setDatabaseName(conf, carbonTable.getTableInfo().getDatabaseName());
CarbonInputFormat.setTableName(conf, carbonTable.getTableInfo().getFactTable().getTableName());
if (partitionNames != null) {
CarbonInputFormat.setPartitionsToPrune(conf, partitionNames);
}
CarbonInputFormat
.setTransactionalTable(conf, carbonTable.getTableInfo().isTransactionalTable());
CarbonProjection columnProjection = new CarbonProjection(projectionColumns);
return createInputFormat(conf, carbonTable.getAbsoluteTableIdentifier(),
new DataMapFilter(carbonTable, filterExpression, true), columnProjection, dataMapJob);
}
private static <V> CarbonTableInputFormat<V> createInputFormat(
Configuration conf,
AbsoluteTableIdentifier identifier,
DataMapFilter dataMapFilter,
CarbonProjection columnProjection,
DataMapJob dataMapJob) throws IOException {
CarbonTableInputFormat<V> format = new CarbonTableInputFormat<>();
CarbonInputFormat.setTablePath(
conf,
identifier.appendWithLocalPrefix(identifier.getTablePath()));
CarbonInputFormat.setQuerySegment(conf, identifier);
CarbonInputFormat.setFilterPredicates(conf, dataMapFilter);
CarbonInputFormat.setColumnProjection(conf, columnProjection);
if (dataMapJob != null) {
DataMapUtil.setDataMapJob(conf, dataMapJob);
} else {
setDataMapJobIfConfigured(conf);
}
// when validate segments is disabled in thread local update it to CarbonTableInputFormat
CarbonSessionInfo carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo();
if (carbonSessionInfo != null) {
String tableUniqueKey = identifier.getDatabaseName() + "." + identifier.getTableName();
String inputSegmentsKey = CarbonCommonConstants.CARBON_INPUT_SEGMENTS + tableUniqueKey;
CarbonInputFormat.setQuerySegment(
conf,
carbonSessionInfo.getThreadParams().getProperty(
inputSegmentsKey,
CarbonProperties.getInstance().getProperty(inputSegmentsKey, "*")));
}
return format;
}
/**
* This method set DataMapJob if configured
*
* @param conf
* @throws IOException
*/
public static void setDataMapJobIfConfigured(Configuration conf) throws IOException {
String className = "org.apache.carbondata.indexserver.EmbeddedDataMapJob";
DataMapUtil.setDataMapJob(conf, DataMapUtil.createDataMapJob(className));
}
public static String createJobTrackerID(java.util.Date date) {
return new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(date);
}
public static JobID getJobId(java.util.Date date, int batch) {
String jobtrackerID = createJobTrackerID(date);
return new JobID(jobtrackerID, batch);
}
}