blob: ca56962d765ec238a6a43e555143fcad0ac35453 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datamap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
public class DataMapUtil {
private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
public static final String EMBEDDED_JOB_NAME =
"org.apache.carbondata.indexserver.EmbeddedDataMapJob";
public static final String DISTRIBUTED_JOB_NAME =
"org.apache.carbondata.indexserver.DistributedDataMapJob";
private static final Logger LOGGER =
LogServiceFactory.getLogService(DataMapUtil.class.getName());
/**
* Creates instance for the DataMap Job class
*
* @param className
* @return
*/
public static Object createDataMapJob(String className) {
try {
return Class.forName(className).getDeclaredConstructors()[0].newInstance();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
return null;
}
}
/**
* This method sets the datamapJob in the configuration
* @param configuration
* @param dataMapJob
* @throws IOException
*/
public static void setDataMapJob(Configuration configuration, Object dataMapJob)
throws IOException {
if (dataMapJob != null) {
String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
configuration.set(DATA_MAP_DSTR, toString);
}
}
/**
* get datamap job from the configuration
* @param configuration job configuration
* @return DataMap Job
* @throws IOException
*/
public static DataMapJob getDataMapJob(Configuration configuration) throws IOException {
String jobString = configuration.get(DATA_MAP_DSTR);
if (jobString != null) {
return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString);
}
return null;
}
/**
* This method gets the datamapJob and call execute , this job will be launched before clearing
* datamaps from driver side during drop table and drop datamap and clears the datamap in executor
* side
* @param carbonTable
* @throws IOException
*/
private static void executeClearDataMapJob(DataMapJob dataMapJob,
CarbonTable carbonTable, String dataMapToClear) throws IOException {
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
getValidAndInvalidSegments(carbonTable, FileFactory.getConfiguration());
List<String> invalidSegment = new ArrayList<>();
for (Segment segment : validAndInvalidSegmentsInfo.getInvalidSegments()) {
invalidSegment.add(segment.getSegmentNo());
}
DistributableDataMapFormat dataMapFormat =
new DistributableDataMapFormat(carbonTable, validAndInvalidSegmentsInfo.getValidSegments(),
invalidSegment, true, dataMapToClear);
try {
dataMapJob.execute(dataMapFormat);
} catch (Exception e) {
// Consider a scenario where clear datamap job is called from drop table
// and index server crashes, in this no exception should be thrown and
// drop table should complete.
LOGGER.error("Failed to execute Datamap clear Job", e);
}
}
public static void executeClearDataMapJob(CarbonTable carbonTable, String jobClassName)
throws IOException {
executeClearDataMapJob(carbonTable, jobClassName, "");
}
static void executeClearDataMapJob(CarbonTable carbonTable, String jobClassName,
String dataMapToClear) throws IOException {
DataMapJob dataMapJob = (DataMapJob) createDataMapJob(jobClassName);
if (dataMapJob == null) {
return;
}
executeClearDataMapJob(dataMapJob, carbonTable, dataMapToClear);
}
public static DataMapJob getEmbeddedJob() {
DataMapJob dataMapJob = (DataMapJob) DataMapUtil.createDataMapJob(EMBEDDED_JOB_NAME);
if (dataMapJob == null) {
throw new ExceptionInInitializerError("Unable to create EmbeddedDataMapJob");
}
return dataMapJob;
}
/**
* Prune the segments from the already pruned blocklets.
*/
public static void pruneSegments(List<Segment> segments, List<ExtendedBlocklet> prunedBlocklets) {
Set<Segment> validSegments = new HashSet<>();
for (ExtendedBlocklet blocklet : prunedBlocklets) {
// Clear the old pruned index files if any present
blocklet.getSegment().getFilteredIndexShardNames().clear();
// Set the pruned index file to the segment
// for further pruning.
String shardName = CarbonTablePath.getShardName(blocklet.getFilePath());
blocklet.getSegment().setFilteredIndexShardName(shardName);
validSegments.add(blocklet.getSegment());
}
segments.clear();
segments.addAll(validSegments);
}
static List<ExtendedBlocklet> pruneDataMaps(CarbonTable table,
FilterResolverIntf filterResolverIntf, List<Segment> segmentsToLoad,
List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets,
DataMapChooser dataMapChooser) throws IOException {
if (null == dataMapChooser) {
return blocklets;
}
pruneSegments(segmentsToLoad, blocklets);
List<ExtendedBlocklet> cgDataMaps = pruneDataMaps(table, filterResolverIntf, segmentsToLoad,
partitions, blocklets,
DataMapLevel.CG, dataMapChooser);
pruneSegments(segmentsToLoad, cgDataMaps);
return pruneDataMaps(table, filterResolverIntf, segmentsToLoad,
partitions, cgDataMaps,
DataMapLevel.FG, dataMapChooser);
}
static List<ExtendedBlocklet> pruneDataMaps(CarbonTable table,
FilterResolverIntf filterResolverIntf, List<Segment> segmentsToLoad,
List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets, DataMapLevel dataMapLevel,
DataMapChooser dataMapChooser)
throws IOException {
DataMapExprWrapper dataMapExprWrapper =
dataMapChooser.chooseDataMap(dataMapLevel, filterResolverIntf);
if (dataMapExprWrapper != null) {
List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
// Prune segments from already pruned blocklets
for (DataMapDistributableWrapper wrapper : dataMapExprWrapper
.toDistributable(segmentsToLoad)) {
TableDataMap dataMap = DataMapStoreManager.getInstance()
.getDataMap(table, wrapper.getDistributable().getDataMapSchema());
List<DataMap> dataMaps = dataMap.getTableDataMaps(wrapper.getDistributable());
List<ExtendedBlocklet> prunnedBlocklet = new ArrayList<>();
if (table.isTransactionalTable()) {
prunnedBlocklet.addAll(dataMap.prune(dataMaps, wrapper.getDistributable(),
dataMapExprWrapper.getFilterResolverIntf(wrapper.getUniqueId()), partitions));
} else {
prunnedBlocklet
.addAll(dataMap.prune(segmentsToLoad, new DataMapFilter(filterResolverIntf),
partitions));
}
// For all blocklets initialize the detail info so that it can be serialized to the driver.
for (ExtendedBlocklet blocklet : prunnedBlocklet) {
blocklet.getDetailInfo();
blocklet.setDataMapUniqueId(wrapper.getUniqueId());
}
extendedBlocklets.addAll(prunnedBlocklet);
}
return dataMapExprWrapper.pruneBlocklets(extendedBlocklets);
}
return blocklets;
}
/**
* this method gets the datamapJob and call execute of that job, this will be launched for
* distributed CG or FG
* @return list of Extended blocklets after pruning
*/
public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
FilterResolverIntf resolver, DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune,
List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
List<String> segmentsToBeRefreshed) throws IOException {
return executeDataMapJob(carbonTable, resolver, dataMapJob, partitionsToPrune, validSegments,
invalidSegments, level, false, segmentsToBeRefreshed, false);
}
/**
* this method gets the datamapJob and call execute of that job, this will be launched for
* distributed CG or FG
* @return list of Extended blocklets after pruning
*/
public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
FilterResolverIntf resolver, DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune,
List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
Boolean isFallbackJob, List<String> segmentsToBeRefreshed, boolean isCountJob)
throws IOException {
List<String> invalidSegmentNo = new ArrayList<>();
for (Segment segment : invalidSegments) {
invalidSegmentNo.add(segment.getSegmentNo());
}
invalidSegmentNo.addAll(segmentsToBeRefreshed);
DistributableDataMapFormat dataMapFormat =
new DistributableDataMapFormat(carbonTable, resolver, validSegments, invalidSegmentNo,
partitionsToPrune, false, level, isFallbackJob, false);
if (isCountJob) {
dataMapFormat.setCountStarJob();
dataMapFormat.setIsWriteToFile(false);
}
return dataMapJob.execute(dataMapFormat);
}
public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
CarbonTable carbonTable, Configuration configuration) throws IOException {
SegmentStatusManager ssm =
new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), configuration);
return ssm.getValidAndInvalidSegments(carbonTable.isChildTableForMV());
}
/**
* Returns valid segment list for a given RelationIdentifier
*
* @param relationIdentifier get list of segments for relation identifier
* @return list of valid segment id's
* @throws IOException
*/
public static List<String> getMainTableValidSegmentList(RelationIdentifier relationIdentifier)
throws IOException {
List<String> segmentList = new ArrayList<>();
List<Segment> validSegments = new SegmentStatusManager(AbsoluteTableIdentifier
.from(relationIdentifier.getTablePath(), relationIdentifier.getDatabaseName(),
relationIdentifier.getTableName())).getValidAndInvalidSegments().getValidSegments();
for (Segment segment : validSegments) {
segmentList.add(segment.getSegmentNo());
}
return segmentList;
}
public static String getMaxSegmentID(List<String> segmentList) {
double[] segment = new double[segmentList.size()];
int i = 0;
for (String id : segmentList) {
segment[i] = Double.parseDouble(id);
i++;
}
Arrays.sort(segment);
String maxId = Double.toString(segment[segmentList.size() - 1]);
if (maxId.endsWith(".0")) {
maxId = maxId.substring(0, maxId.indexOf("."));
}
return maxId;
}
}