| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.core.index; |
| |
| import java.io.IOException; |
| import java.lang.reflect.Constructor; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.datastore.impl.FileFactory; |
| import org.apache.carbondata.core.index.dev.Index; |
| import org.apache.carbondata.core.index.dev.expr.IndexExprWrapper; |
| import org.apache.carbondata.core.index.dev.expr.IndexInputSplitWrapper; |
| import org.apache.carbondata.core.indexstore.ExtendedBlocklet; |
| import org.apache.carbondata.core.indexstore.PartitionSpec; |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable; |
| import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope; |
| import org.apache.carbondata.core.readcommitter.ReadCommittedScope; |
| import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; |
| import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; |
| import org.apache.carbondata.core.statusmanager.SegmentStatusManager; |
| import org.apache.carbondata.core.util.BlockletIndexUtil; |
| import org.apache.carbondata.core.util.CarbonProperties; |
| import org.apache.carbondata.core.util.ObjectSerializationUtil; |
| import org.apache.carbondata.core.util.path.CarbonTablePath; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
| import org.apache.log4j.Logger; |
| |
| public class IndexUtil { |
| |
| private static final String INDEX_DSTR = "mapreduce.input.carboninputformat.indexdstr"; |
| |
| public static final String EMBEDDED_JOB_NAME = |
| "org.apache.carbondata.indexserver.EmbeddedIndexJob"; |
| |
| public static final String DISTRIBUTED_JOB_NAME = |
| "org.apache.carbondata.indexserver.DistributedIndexJob"; |
| |
| private static final Logger LOGGER = |
| LogServiceFactory.getLogService(IndexUtil.class.getName()); |
| |
| /** |
| * Creates instance for the Index Job class |
| * |
| * @param className |
| * @return |
| */ |
| public static Object createIndexJob(String className) { |
| try { |
| return Class.forName(className).getDeclaredConstructors()[0].newInstance(); |
| } catch (Exception e) { |
| LOGGER.error(e.getMessage(), e); |
| return null; |
| } |
| } |
| |
| /** |
| * This method sets the indexJob in the configuration |
| * @param configuration |
| * @param IndexJob |
| * @throws IOException |
| */ |
| public static void setIndexJob(Configuration configuration, Object IndexJob) |
| throws IOException { |
| if (IndexJob != null) { |
| String toString = ObjectSerializationUtil.convertObjectToString(IndexJob); |
| configuration.set(INDEX_DSTR, toString); |
| } |
| } |
| |
| /** |
| * get index job from the configuration |
| * @param configuration job configuration |
| * @return Index Job |
| * @throws IOException |
| */ |
| public static IndexJob getIndexJob(Configuration configuration) throws IOException { |
| String jobString = configuration.get(INDEX_DSTR); |
| if (jobString != null) { |
| return (IndexJob) ObjectSerializationUtil.convertStringToObject(jobString); |
| } |
| return null; |
| } |
| |
| /** |
| * This method gets the indexJob and call execute , this job will be launched before clearing |
| * indexes from driver side during drop table and drop index and clears the index in executor |
| * side |
| * @param carbonTable |
| * @throws IOException |
| */ |
| private static void executeClearIndexJob(IndexJob indexJob, |
| CarbonTable carbonTable, String indexToClear) throws IOException { |
| IndexInputFormat indexInputFormat; |
| if (!carbonTable.isTransactionalTable()) { |
| ReadCommittedScope readCommittedScope = |
| new LatestFilesReadCommittedScope(carbonTable.getTablePath(), |
| FileFactory.getConfiguration()); |
| LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList(); |
| List<Segment> listOfValidSegments = new ArrayList<>(loadMetadataDetails.length); |
| Arrays.stream(loadMetadataDetails).forEach(segment -> { |
| Segment seg = new Segment(segment.getLoadName(), segment.getSegmentFile()); |
| seg.setLoadMetadataDetails(segment); |
| listOfValidSegments.add(seg); |
| }); |
| indexInputFormat = |
| new IndexInputFormat(carbonTable, listOfValidSegments, new ArrayList<>(0), true, |
| indexToClear); |
| } else { |
| SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo = |
| getValidAndInvalidSegments(carbonTable, FileFactory.getConfiguration()); |
| List<String> invalidSegment = new ArrayList<>(); |
| validAndInvalidSegmentsInfo.getInvalidSegments() |
| .forEach(segment -> invalidSegment.add(segment.getSegmentNo())); |
| indexInputFormat = |
| new IndexInputFormat(carbonTable, validAndInvalidSegmentsInfo.getValidSegments(), |
| invalidSegment, true, indexToClear); |
| } |
| try { |
| indexJob.execute(indexInputFormat, null); |
| } catch (Exception e) { |
| // Consider a scenario where clear index job is called from drop table |
| // and index server crashes, in this no exception should be thrown and |
| // drop table should complete. |
| LOGGER.error("Failed to execute Index clear Job", e); |
| } |
| } |
| |
| public static void executeClearIndexJob(CarbonTable carbonTable, String jobClassName) |
| throws IOException { |
| executeClearIndexJob(carbonTable, jobClassName, ""); |
| } |
| |
| static void executeClearIndexJob(CarbonTable carbonTable, String jobClassName, |
| String indexToClear) throws IOException { |
| IndexJob indexJob = (IndexJob) createIndexJob(jobClassName); |
| if (indexJob == null) { |
| return; |
| } |
| executeClearIndexJob(indexJob, carbonTable, indexToClear); |
| } |
| |
| public static IndexJob getEmbeddedJob() { |
| IndexJob indexJob = (IndexJob) IndexUtil.createIndexJob(EMBEDDED_JOB_NAME); |
| if (indexJob == null) { |
| throw new ExceptionInInitializerError("Unable to create EmbeddedIndexJob"); |
| } |
| return indexJob; |
| } |
| |
| /** |
| * Prune the segments from the already pruned blocklets. |
| */ |
| public static void pruneSegments(List<Segment> segments, List<ExtendedBlocklet> prunedBlocklets) { |
| Map<Segment, Set<String>> validSegments = new HashMap<>(); |
| for (ExtendedBlocklet blocklet : prunedBlocklets) { |
| // Set the pruned index file to the segment |
| // for further pruning. |
| String shardName = CarbonTablePath.getShardName(blocklet.getFilePath()); |
| // Add the existing shards to corresponding segments |
| Set<String> existingShards = validSegments.get(blocklet.getSegment()); |
| if (existingShards == null) { |
| existingShards = new HashSet<>(); |
| validSegments.put(blocklet.getSegment(), existingShards); |
| } |
| existingShards.add(shardName); |
| } |
| // override the shards list in the segments. |
| for (Map.Entry<Segment, Set<String>> entry : validSegments.entrySet()) { |
| entry.getKey().setFilteredIndexShardNames(entry.getValue()); |
| } |
| segments.clear(); |
| // add the new segments to the segments list. |
| segments.addAll(validSegments.keySet()); |
| } |
| |
| /** |
| |
| Loads the indexes in parallel by utilizing executor |
| * |
| @param carbonTable |
| @param indexExprWrapper |
| @param validSegments |
| */ |
| public static void loadIndexes(CarbonTable carbonTable, IndexExprWrapper indexExprWrapper, |
| List<Segment> validSegments) { |
| if (!CarbonProperties.getInstance() |
| .isDistributedPruningEnabled(carbonTable.getDatabaseName(), carbonTable.getTableName()) |
| && BlockletIndexUtil.loadIndexesParallel(carbonTable)) { |
| String clsName = "org.apache.spark.sql.secondaryindex.jobs.SparkBlockletIndexLoaderJob"; |
| IndexJob indexJob = (IndexJob) createIndexJob(clsName); |
| String className = "org.apache.spark.sql.secondaryindex.jobs.BlockletIndexInputFormat"; |
| FileInputFormat indexFormat = |
| createIndexJob(carbonTable, indexExprWrapper, validSegments, className); |
| indexJob.execute(carbonTable, indexFormat); |
| } |
| } |
| |
| public static Object[] getPositionReferences(String sql) { |
| IndexJob indexJob = (IndexJob) createIndexJob( |
| "org.apache.spark.sql.secondaryindex.jobs.StringProjectionQueryJob"); |
| return indexJob.execute(sql); |
| } |
| |
| private static FileInputFormat createIndexJob(CarbonTable carbonTable, |
| IndexExprWrapper indexExprWrapper, List<Segment> validSegments, String clsName) { |
| try { |
| Constructor<?> cons = Class.forName(clsName).getDeclaredConstructors()[0]; |
| return (FileInputFormat) cons |
| .newInstance(carbonTable, indexExprWrapper, validSegments); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| static List<ExtendedBlocklet> pruneIndexes(CarbonTable table, |
| FilterResolverIntf filterResolverIntf, List<Segment> segmentsToLoad, |
| List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets, |
| IndexChooser indexChooser) throws IOException { |
| if (null == indexChooser) { |
| return blocklets; |
| } |
| pruneSegments(segmentsToLoad, blocklets); |
| List<ExtendedBlocklet> cgIndexes = pruneIndexes(table, filterResolverIntf, segmentsToLoad, |
| partitions, blocklets, |
| IndexLevel.CG, indexChooser); |
| pruneSegments(segmentsToLoad, cgIndexes); |
| return pruneIndexes(table, filterResolverIntf, segmentsToLoad, |
| partitions, cgIndexes, |
| IndexLevel.FG, indexChooser); |
| } |
| |
| static List<ExtendedBlocklet> pruneIndexes(CarbonTable table, |
| FilterResolverIntf filterResolverIntf, List<Segment> segmentsToLoad, |
| List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets, IndexLevel indexLevel, |
| IndexChooser indexChooser) |
| throws IOException { |
| IndexExprWrapper indexExprWrapper = |
| indexChooser.chooseIndex(indexLevel, filterResolverIntf); |
| if (indexExprWrapper != null) { |
| List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>(); |
| // Prune segments from already pruned blocklets |
| for (IndexInputSplitWrapper wrapper : indexExprWrapper |
| .toDistributable(segmentsToLoad)) { |
| TableIndex index = IndexStoreManager.getInstance() |
| .getIndex(table, wrapper.getDistributable().getIndexSchema()); |
| List<Index> indices = index.getTableIndexes(wrapper.getDistributable()); |
| List<ExtendedBlocklet> prunedBlocklet = new ArrayList<>(); |
| if (table.isTransactionalTable()) { |
| prunedBlocklet.addAll(index.prune(indices, wrapper.getDistributable(), |
| indexExprWrapper.getFilterResolverIntf(wrapper.getUniqueId()), partitions)); |
| } else { |
| prunedBlocklet |
| .addAll(index.prune(segmentsToLoad, new IndexFilter(filterResolverIntf), |
| partitions)); |
| } |
| // For all blocklets initialize the detail info so that it can be serialized to the driver. |
| for (ExtendedBlocklet blocklet : prunedBlocklet) { |
| blocklet.getDetailInfo(); |
| blocklet.setIndexUniqueId(wrapper.getUniqueId()); |
| blocklet.setCgIndexPresent(true); |
| } |
| extendedBlocklets.addAll(prunedBlocklet); |
| } |
| return indexExprWrapper.pruneBlocklets(extendedBlocklets); |
| } |
| return blocklets; |
| } |
| |
| /** |
| * this method gets the indexJob and call execute of that job, this will be launched for |
| * distributed CG or FG |
| * @return list of Extended blocklets after pruning |
| */ |
| public static List<ExtendedBlocklet> executeIndexJob(CarbonTable carbonTable, |
| FilterResolverIntf resolver, IndexJob indexJob, List<PartitionSpec> partitionsToPrune, |
| List<Segment> validSegments, List<Segment> invalidSegments, IndexLevel level, |
| List<String> segmentsToBeRefreshed, Configuration configuration) { |
| return executeIndexJob(carbonTable, resolver, indexJob, partitionsToPrune, validSegments, |
| invalidSegments, level, false, segmentsToBeRefreshed, false, false, configuration, null); |
| } |
| |
| /** |
| * this method gets the indexJob and call execute of that job, this will be launched for |
| * distributed CG or FG |
| * @return list of Extended blocklets after pruning |
| */ |
| public static List<ExtendedBlocklet> executeIndexJob(CarbonTable carbonTable, |
| FilterResolverIntf resolver, IndexJob indexJob, List<PartitionSpec> partitionsToPrune, |
| List<Segment> validSegments, List<Segment> invalidSegments, IndexLevel level, |
| Boolean isFallbackJob, List<String> segmentsToBeRefreshed, boolean isCountJob, |
| boolean isSIPruningEnabled, Configuration configuration, Set<String> missingSISegments) { |
| List<String> invalidSegmentNo = new ArrayList<>(); |
| for (Segment segment : invalidSegments) { |
| invalidSegmentNo.add(segment.getSegmentNo()); |
| } |
| invalidSegmentNo.addAll(segmentsToBeRefreshed); |
| IndexInputFormat indexInputFormat = |
| new IndexInputFormat(carbonTable, resolver, validSegments, invalidSegmentNo, |
| partitionsToPrune, false, level, isFallbackJob, false); |
| if (missingSISegments != null) { |
| indexInputFormat.setMissingSISegments(missingSISegments); |
| } |
| if (isCountJob) { |
| indexInputFormat.setCountStarJob(); |
| indexInputFormat.setIsWriteToFile(false); |
| } |
| indexInputFormat.setSIPruningEnabled(isSIPruningEnabled); |
| return indexJob.execute(indexInputFormat, configuration); |
| } |
| |
| public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments( |
| CarbonTable carbonTable, Configuration configuration) throws IOException { |
| SegmentStatusManager ssm = |
| new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), configuration, |
| carbonTable.getTableStatusVersion()); |
| return ssm.getValidAndInvalidSegments(carbonTable.isMV()); |
| } |
| |
| } |