| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.hadoop.util; |
| |
| import java.io.IOException; |
| import java.text.SimpleDateFormat; |
| import java.util.List; |
| import java.util.Locale; |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal; |
| import org.apache.carbondata.core.datamap.DataMapFilter; |
| import org.apache.carbondata.core.datamap.DataMapJob; |
| import org.apache.carbondata.core.datamap.DataMapUtil; |
| import org.apache.carbondata.core.exception.InvalidConfigurationException; |
| import org.apache.carbondata.core.indexstore.PartitionSpec; |
| import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable; |
| import org.apache.carbondata.core.scan.expression.Expression; |
| import org.apache.carbondata.core.util.CarbonProperties; |
| import org.apache.carbondata.core.util.CarbonSessionInfo; |
| import org.apache.carbondata.core.util.ThreadLocalSessionInfo; |
| import org.apache.carbondata.hadoop.CarbonProjection; |
| import org.apache.carbondata.hadoop.api.CarbonInputFormat; |
| import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.JobID; |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
| import org.apache.log4j.Logger; |
| |
| /** |
| * Utility class |
| */ |
| public class CarbonInputFormatUtil { |
| |
| /** |
| * Attribute for Carbon LOGGER. |
| */ |
| private static final Logger LOGGER = |
| LogServiceFactory.getLogService(CarbonProperties.class.getName()); |
| |
| public static <V> CarbonTableInputFormat<V> createCarbonInputFormat( |
| AbsoluteTableIdentifier identifier, |
| Job job) throws IOException { |
| CarbonTableInputFormat<V> carbonInputFormat = new CarbonTableInputFormat<>(); |
| CarbonTableInputFormat.setDatabaseName( |
| job.getConfiguration(), identifier.getCarbonTableIdentifier().getDatabaseName()); |
| CarbonTableInputFormat.setTableName( |
| job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName()); |
| FileInputFormat.addInputPath(job, new Path(identifier.getTablePath())); |
| setDataMapJobIfConfigured(job.getConfiguration()); |
| return carbonInputFormat; |
| } |
| |
| public static <V> CarbonTableInputFormat<V> createCarbonTableInputFormat( |
| AbsoluteTableIdentifier identifier, List<String> partitionId, Job job) throws IOException { |
| CarbonTableInputFormat<V> carbonTableInputFormat = new CarbonTableInputFormat<>(); |
| CarbonTableInputFormat.setPartitionIdList( |
| job.getConfiguration(), partitionId); |
| CarbonTableInputFormat.setDatabaseName( |
| job.getConfiguration(), identifier.getCarbonTableIdentifier().getDatabaseName()); |
| CarbonTableInputFormat.setTableName( |
| job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName()); |
| FileInputFormat.addInputPath(job, new Path(identifier.getTablePath())); |
| setDataMapJobIfConfigured(job.getConfiguration()); |
| return carbonTableInputFormat; |
| } |
| |
| public static <V> CarbonTableInputFormat<V> createCarbonTableInputFormat( |
| Job job, |
| CarbonTable carbonTable, |
| String[] projectionColumns, |
| Expression filterExpression, |
| List<PartitionSpec> partitionNames, |
| DataMapJob dataMapJob) throws IOException, InvalidConfigurationException { |
| Configuration conf = job.getConfiguration(); |
| CarbonInputFormat.setTableInfo(conf, carbonTable.getTableInfo()); |
| CarbonInputFormat.setDatabaseName(conf, carbonTable.getTableInfo().getDatabaseName()); |
| CarbonInputFormat.setTableName(conf, carbonTable.getTableInfo().getFactTable().getTableName()); |
| if (partitionNames != null) { |
| CarbonInputFormat.setPartitionsToPrune(conf, partitionNames); |
| } |
| CarbonInputFormat |
| .setTransactionalTable(conf, carbonTable.getTableInfo().isTransactionalTable()); |
| CarbonProjection columnProjection = new CarbonProjection(projectionColumns); |
| return createInputFormat(conf, carbonTable.getAbsoluteTableIdentifier(), |
| new DataMapFilter(carbonTable, filterExpression, true), columnProjection, dataMapJob); |
| } |
| |
| private static <V> CarbonTableInputFormat<V> createInputFormat( |
| Configuration conf, |
| AbsoluteTableIdentifier identifier, |
| DataMapFilter dataMapFilter, |
| CarbonProjection columnProjection, |
| DataMapJob dataMapJob) throws InvalidConfigurationException, IOException { |
| CarbonTableInputFormat<V> format = new CarbonTableInputFormat<>(); |
| CarbonInputFormat.setTablePath( |
| conf, |
| identifier.appendWithLocalPrefix(identifier.getTablePath())); |
| CarbonInputFormat.setQuerySegment(conf, identifier); |
| CarbonInputFormat.setFilterPredicates(conf, dataMapFilter); |
| CarbonInputFormat.setColumnProjection(conf, columnProjection); |
| if (dataMapJob != null) { |
| DataMapUtil.setDataMapJob(conf, dataMapJob); |
| } else { |
| setDataMapJobIfConfigured(conf); |
| } |
| // when validate segments is disabled in thread local update it to CarbonTableInputFormat |
| CarbonSessionInfo carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo(); |
| if (carbonSessionInfo != null) { |
| String tableUniqueKey = identifier.getDatabaseName() + "." + identifier.getTableName(); |
| String validateInputSegmentsKey = CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + |
| tableUniqueKey; |
| CarbonInputFormat.setValidateSegmentsToAccess( |
| conf, |
| Boolean.valueOf(carbonSessionInfo.getThreadParams().getProperty( |
| validateInputSegmentsKey, "true"))); |
| String queryOnPreAggStreamingKey = CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING + |
| tableUniqueKey; |
| boolean queryOnPreAggStreaming = Boolean.valueOf(carbonSessionInfo.getThreadParams() |
| .getProperty(queryOnPreAggStreamingKey, "false")); |
| String inputSegmentsKey = CarbonCommonConstants.CARBON_INPUT_SEGMENTS + tableUniqueKey; |
| CarbonInputFormat.setValidateSegmentsToAccess(conf, |
| Boolean.valueOf(carbonSessionInfo.getThreadParams() |
| .getProperty(validateInputSegmentsKey, "true"))); |
| CarbonInputFormat.setQuerySegment( |
| conf, |
| carbonSessionInfo.getThreadParams().getProperty( |
| inputSegmentsKey, |
| CarbonProperties.getInstance().getProperty(inputSegmentsKey, "*"))); |
| if (queryOnPreAggStreaming) { |
| CarbonInputFormat.setAccessStreamingSegments(conf, true); |
| carbonSessionInfo.getThreadParams().removeProperty(queryOnPreAggStreamingKey); |
| carbonSessionInfo.getThreadParams().removeProperty(inputSegmentsKey); |
| carbonSessionInfo.getThreadParams().removeProperty(validateInputSegmentsKey); |
| } |
| } |
| return format; |
| } |
| |
| /** |
| * This method set DataMapJob if configured |
| * |
| * @param conf |
| * @throws IOException |
| */ |
| public static void setDataMapJobIfConfigured(Configuration conf) throws IOException { |
| String className = "org.apache.carbondata.indexserver.EmbeddedDataMapJob"; |
| DataMapUtil.setDataMapJob(conf, DataMapUtil.createDataMapJob(className)); |
| } |
| |
| public static String createJobTrackerID(java.util.Date date) { |
| return new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(date); |
| } |
| |
| public static JobID getJobId(java.util.Date date, int batch) { |
| String jobtrackerID = createJobTrackerID(date); |
| return new JobID(jobtrackerID, batch); |
| } |
| |
| } |