blob: 5cca96cc88a3d4e97a7b4039acbdf5fb55e119f4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.hadoop.api;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
import org.apache.carbondata.core.datamap.DataMapChooser;
import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.model.QueryModelBuilder;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeConverter;
import org.apache.carbondata.core.util.DataTypeConverterImpl;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
import org.apache.carbondata.hadoop.CarbonProjection;
import org.apache.carbondata.hadoop.CarbonRecordReader;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.security.TokenCache;
/**
* Base class for carbondata input format, there are two input format implementations:
* 1. CarbonFileInputFormat: for reading carbondata files without table level metadata support.
*
* 2. CarbonTableInputFormat: for reading carbondata files with table level metadata support,
* such as segment and explicit schema metadata.
*
* @param <T>
*/
public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
// comma separated list of input segment numbers
public static final String INPUT_SEGMENT_NUMBERS =
"mapreduce.input.carboninputformat.segmentnumbers";
private static final String VALIDATE_INPUT_SEGMENT_IDs =
"mapreduce.input.carboninputformat.validsegments";
// comma separated list of input files
private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
private static final Log LOG = LogFactory.getLog(CarbonInputFormat.class);
private static final String FILTER_PREDICATE =
"mapreduce.input.carboninputformat.filter.predicate";
private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
private static final String CARBON_TRANSACTIONAL_TABLE =
"mapreduce.input.carboninputformat.transactional";
private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
private static final String PARTITIONS_TO_PRUNE =
"mapreduce.input.carboninputformat.partitions.to.prune";
private static final String FGDATAMAP_PRUNING = "mapreduce.input.carboninputformat.fgdatamap";
// record segment number and hit blocks
protected int numSegments = 0;
protected int numStreamSegments = 0;
protected int numBlocks = 0;
public int getNumSegments() {
return numSegments;
}
public int getNumStreamSegments() {
return numStreamSegments;
}
public int getNumBlocks() {
return numBlocks;
}
/**
* Set the `tableInfo` in `configuration`
*/
public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
throws IOException {
if (null != tableInfo) {
configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize()));
}
}
/**
* Get TableInfo object from `configuration`
*/
protected static TableInfo getTableInfo(Configuration configuration) throws IOException {
String tableInfoStr = configuration.get(TABLE_INFO);
if (tableInfoStr == null) {
return null;
} else {
TableInfo output = new TableInfo();
output.readFields(new DataInputStream(
new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr))));
return output;
}
}
/**
* Get the cached CarbonTable or create it by TableInfo in `configuration`
*/
protected abstract CarbonTable getOrCreateCarbonTable(Configuration configuration)
throws IOException;
public static void setTablePath(Configuration configuration, String tablePath) {
configuration.set(FileInputFormat.INPUT_DIR, tablePath);
}
public static void setTransactionalTable(Configuration configuration,
boolean isTransactionalTable) {
configuration.set(CARBON_TRANSACTIONAL_TABLE, String.valueOf(isTransactionalTable));
}
public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
}
public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob)
throws IOException {
if (dataMapJob != null) {
String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
configuration.set(DATA_MAP_DSTR, toString);
}
}
public static DataMapJob getDataMapJob(Configuration configuration) throws IOException {
String jobString = configuration.get(DATA_MAP_DSTR);
if (jobString != null) {
return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString);
}
return null;
}
/**
* It sets unresolved filter expression.
*
* @param configuration
* @param filterExpression
*/
public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
if (filterExpression == null) {
return;
}
try {
String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
configuration.set(FILTER_PREDICATE, filterString);
} catch (Exception e) {
throw new RuntimeException("Error while setting filter expression to Job", e);
}
}
public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
if (projection == null || projection.isEmpty()) {
return;
}
String[] allColumns = projection.getAllColumns();
StringBuilder builder = new StringBuilder();
for (String column : allColumns) {
builder.append(column).append(",");
}
String columnString = builder.toString();
columnString = columnString.substring(0, columnString.length() - 1);
configuration.set(COLUMN_PROJECTION, columnString);
}
public static String getColumnProjection(Configuration configuration) {
return configuration.get(COLUMN_PROJECTION);
}
public static void setFgDataMapPruning(Configuration configuration, boolean enable) {
configuration.set(FGDATAMAP_PRUNING, String.valueOf(enable));
}
public static boolean isFgDataMapPruningEnable(Configuration configuration) {
String enable = configuration.get(FGDATAMAP_PRUNING);
// if FDDATAMAP_PRUNING is not set, by default we will use FGDataMap
return (enable == null) || enable.equalsIgnoreCase("true");
}
/**
* Set list of segments to access
*/
public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) {
configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
}
/**
* Set `CARBON_INPUT_SEGMENTS` from property to configuration
*/
public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) {
String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase();
String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase();
String segmentNumbersFromProperty = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*");
if (!segmentNumbersFromProperty.trim().equals("*")) {
CarbonInputFormat.setSegmentsToAccess(conf,
Segment.toSegmentList(segmentNumbersFromProperty.split(","), null));
}
}
/**
* Set `CARBON_INPUT_SEGMENTS` from property to configuration
*/
public static void setQuerySegment(Configuration conf, String segmentList) {
if (!segmentList.trim().equals("*")) {
CarbonInputFormat
.setSegmentsToAccess(conf, Segment.toSegmentList(segmentList.split(","), null));
}
}
/**
* set list of segment to access
*/
public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) {
configuration.set(CarbonInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
}
/**
* get list of segment to access
*/
public static boolean getValidateSegmentsToAccess(Configuration configuration) {
return configuration.get(CarbonInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true")
.equalsIgnoreCase("true");
}
/**
* set list of partitions to prune
*/
public static void setPartitionsToPrune(Configuration configuration,
List<PartitionSpec> partitions) {
if (partitions == null) {
return;
}
try {
String partitionString =
ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions));
configuration.set(PARTITIONS_TO_PRUNE, partitionString);
} catch (Exception e) {
throw new RuntimeException("Error while setting patition information to Job" + partitions, e);
}
}
/**
* get list of partitions to prune
*/
public static List<PartitionSpec> getPartitionsToPrune(Configuration configuration)
throws IOException {
String partitionString = configuration.get(PARTITIONS_TO_PRUNE);
if (partitionString != null) {
return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString);
}
return null;
}
public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
throws IOException {
String tablePath = configuration.get(INPUT_DIR, "");
try {
return AbsoluteTableIdentifier
.from(tablePath, getDatabaseName(configuration), getTableName(configuration));
} catch (InvalidConfigurationException e) {
throw new IOException(e);
}
}
/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR
* are used to get table path to read.
*
* @param job
* @return List<InputSplit> list of CarbonInputSplit
* @throws IOException
*/
@Override public abstract List<InputSplit> getSplits(JobContext job) throws IOException;
protected Expression getFilterPredicates(Configuration configuration) {
try {
String filterExprString = configuration.get(FILTER_PREDICATE);
if (filterExprString == null) {
return null;
}
Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
return (Expression) filter;
} catch (IOException e) {
throw new RuntimeException("Error while reading filter expression", e);
}
}
/**
* get data blocks of given segment
*/
protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job, CarbonTable carbonTable,
FilterResolverIntf resolver, BitSet matchedPartitions, List<Segment> segmentIds,
PartitionInfo partitionInfo, List<Integer> oldPartitionIdList) throws IOException {
QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
QueryStatistic statistic = new QueryStatistic();
// get tokens for all the required FileSystem for table path
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { new Path(carbonTable.getTablePath()) }, job.getConfiguration());
List<ExtendedBlocklet> prunedBlocklets =
getPrunedBlocklets(job, carbonTable, resolver, segmentIds);
List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
int partitionIndex = 0;
List<Integer> partitionIdList = new ArrayList<>();
if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
partitionIdList = partitionInfo.getPartitionIds();
}
for (ExtendedBlocklet blocklet : prunedBlocklets) {
long partitionId = CarbonTablePath.DataFileUtil
.getTaskIdFromTaskNo(CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath()));
// OldPartitionIdList is only used in alter table partition command because it change
// partition info first and then read data.
// For other normal query should use newest partitionIdList
if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
if (oldPartitionIdList != null) {
partitionIndex = oldPartitionIdList.indexOf((int) partitionId);
} else {
partitionIndex = partitionIdList.indexOf((int) partitionId);
}
}
if (partitionIndex != -1) {
// matchedPartitions variable will be null in two cases as follows
// 1. the table is not a partition table
// 2. the table is a partition table, and all partitions are matched by query
// for partition table, the task id of carbaondata file name is the partition id.
// if this partition is not required, here will skip it.
if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) {
CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet);
if (inputSplit != null) {
resultFilterredBlocks.add(inputSplit);
}
}
}
}
statistic
.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
return resultFilterredBlocks;
}
/**
* Prune the blocklets using the filter expression with available datamaps.
*/
private List<ExtendedBlocklet> getPrunedBlocklets(JobContext job, CarbonTable carbonTable,
FilterResolverIntf resolver, List<Segment> segmentIds) throws IOException {
boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
// First prune using default datamap on driver side.
DataMapExprWrapper dataMapExprWrapper = DataMapChooser.get()
.getDefaultDataMap(getOrCreateCarbonTable(job.getConfiguration()), resolver);
List<ExtendedBlocklet> prunedBlocklets =
dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
// Get the available CG datamaps and prune further.
DataMapExprWrapper cgDataMapExprWrapper = DataMapChooser.get()
.chooseCGDataMap(getOrCreateCarbonTable(job.getConfiguration()), resolver);
if (cgDataMapExprWrapper != null) {
// Prune segments from already pruned blocklets
pruneSegments(segmentIds, prunedBlocklets);
// Again prune with CG datamap.
if (distributedCG && dataMapJob != null) {
prunedBlocklets =
executeDataMapJob(carbonTable, resolver, segmentIds, cgDataMapExprWrapper, dataMapJob,
partitionsToPrune);
} else {
prunedBlocklets = cgDataMapExprWrapper.prune(segmentIds, partitionsToPrune);
}
}
// Now try to prune with FG DataMap.
dataMapExprWrapper = DataMapChooser.get()
.chooseFGDataMap(getOrCreateCarbonTable(job.getConfiguration()), resolver);
if (dataMapExprWrapper != null && dataMapExprWrapper.getDataMapType() == DataMapLevel.FG
&& isFgDataMapPruningEnable(job.getConfiguration()) && dataMapJob != null) {
// Prune segments from already pruned blocklets
pruneSegments(segmentIds, prunedBlocklets);
prunedBlocklets =
executeDataMapJob(carbonTable, resolver, segmentIds, dataMapExprWrapper, dataMapJob,
partitionsToPrune);
}
return prunedBlocklets;
}
private List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
FilterResolverIntf resolver, List<Segment> segmentIds, DataMapExprWrapper dataMapExprWrapper,
DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune) throws IOException {
DistributableDataMapFormat datamapDstr =
new DistributableDataMapFormat(carbonTable, dataMapExprWrapper, segmentIds,
partitionsToPrune, BlockletDataMapFactory.class.getName());
List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
// Apply expression on the blocklets.
prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
return prunedBlocklets;
}
/**
* Prune the segments from the already pruned blocklets.
* @param segments
* @param prunedBlocklets
*/
private void pruneSegments(List<Segment> segments, List<ExtendedBlocklet> prunedBlocklets) {
List<Segment> toBeRemovedSegments = new ArrayList<>();
for (Segment segment : segments) {
boolean found = false;
// Clear the old pruned index files if any present
segment.getFilteredIndexShardNames().clear();
// Check the segment exist in any of the pruned blocklets.
for (ExtendedBlocklet blocklet : prunedBlocklets) {
if (blocklet.getSegmentId().equals(segment.getSegmentNo())) {
found = true;
// Set the pruned index file to the segment for further pruning.
String uniqueTaskName = CarbonTablePath.getUniqueTaskName(blocklet.getTaskName());
segment.setFilteredIndexShardName(uniqueTaskName);
}
}
// Add to remove segments list if not present in pruned blocklets.
if (!found) {
toBeRemovedSegments.add(segment);
}
}
// Remove all segments which are already pruned from pruned blocklets
segments.removeAll(toBeRemovedSegments);
}
private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException {
CarbonInputSplit split = CarbonInputSplit
.from(blocklet.getSegmentId(), blocklet.getBlockletId(),
new FileSplit(new Path(blocklet.getPath()), 0, blocklet.getLength(),
blocklet.getLocations()),
ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()),
blocklet.getDataMapWriterPath());
split.setDetailInfo(blocklet.getDetailInfo());
return split;
}
@Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
Configuration configuration = taskAttemptContext.getConfiguration();
QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
return new CarbonRecordReader<T>(queryModel, readSupport);
}
public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException {
Configuration configuration = taskAttemptContext.getConfiguration();
CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
// set projection column in the query model
String projectionString = getColumnProjection(configuration);
String[] projectColumns;
if (projectionString != null) {
projectColumns = projectionString.split(",");
} else {
projectColumns = new String[]{};
}
QueryModel queryModel = new QueryModelBuilder(carbonTable)
.projectColumns(projectColumns)
.filterExpression(getFilterPredicates(configuration))
.dataConverter(getDataTypeConverter(configuration))
.build();
// update the file level index store if there are invalid segment
if (inputSplit instanceof CarbonMultiBlockSplit) {
CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
if (invalidSegments.size() > 0) {
queryModel.setInvalidSegmentIds(invalidSegments);
}
List<UpdateVO> invalidTimestampRangeList =
split.getAllSplits().get(0).getInvalidTimestampRange();
if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
}
}
return queryModel;
}
public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
//By default it uses dictionary decoder read class
CarbonReadSupport<T> readSupport = null;
if (readSupportClass != null) {
try {
Class<?> myClass = Class.forName(readSupportClass);
Constructor<?> constructor = myClass.getConstructors()[0];
Object object = constructor.newInstance();
if (object instanceof CarbonReadSupport) {
readSupport = (CarbonReadSupport) object;
}
} catch (ClassNotFoundException ex) {
LOG.error("Class " + readSupportClass + "not found", ex);
} catch (Exception ex) {
LOG.error("Error while creating " + readSupportClass, ex);
}
} else {
readSupport = new DictionaryDecodeReadSupport<>();
}
return readSupport;
}
@Override protected boolean isSplitable(JobContext context, Path filename) {
try {
// Don't split the file if it is local file system
FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
if (fileSystem instanceof LocalFileSystem) {
return false;
}
} catch (Exception e) {
return true;
}
return true;
}
public static void setCarbonReadSupport(Configuration configuration,
Class<? extends CarbonReadSupport> readSupportClass) {
if (readSupportClass != null) {
configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
}
}
/**
* It is optional, if user does not set then it reads from store
*
* @param configuration
* @param converterClass is the Data type converter for different computing engine
*/
public static void setDataTypeConverter(
Configuration configuration, Class<? extends DataTypeConverter> converterClass) {
if (null != converterClass) {
configuration.set(CARBON_CONVERTER, converterClass.getCanonicalName());
}
}
public static DataTypeConverter getDataTypeConverter(Configuration configuration)
throws IOException {
String converterClass = configuration.get(CARBON_CONVERTER);
if (converterClass == null) {
return new DataTypeConverterImpl();
}
try {
return (DataTypeConverter) Class.forName(converterClass).newInstance();
} catch (Exception e) {
throw new IOException(e);
}
}
public static void setDatabaseName(Configuration configuration, String databaseName) {
if (null != databaseName) {
configuration.set(DATABASE_NAME, databaseName);
}
}
public static String getDatabaseName(Configuration configuration)
throws InvalidConfigurationException {
String databseName = configuration.get(DATABASE_NAME);
if (null == databseName) {
throw new InvalidConfigurationException("Database name is not set.");
}
return databseName;
}
public static void setTableName(Configuration configuration, String tableName) {
if (null != tableName) {
configuration.set(TABLE_NAME, tableName);
}
}
public static String getTableName(Configuration configuration)
throws InvalidConfigurationException {
String tableName = configuration.get(TABLE_NAME);
if (tableName == null) {
throw new InvalidConfigurationException("Table name is not set");
}
return tableName;
}
public static void setAccessStreamingSegments(Configuration configuration, Boolean validate)
throws InvalidConfigurationException {
configuration.set(
CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING + "." + getDatabaseName(
configuration) + "." + getTableName(configuration), validate.toString());
}
public static boolean getAccessStreamingSegments(Configuration configuration) {
try {
return configuration.get(
CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING + "." + getDatabaseName(
configuration) + "." + getTableName(
configuration), "false").equalsIgnoreCase("true");
} catch (InvalidConfigurationException e) {
return false;
}
}
}