| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.hadoop.api; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal; |
| import org.apache.carbondata.core.datamap.DataMapFilter; |
| import org.apache.carbondata.core.datamap.DataMapStoreManager; |
| import org.apache.carbondata.core.datamap.Segment; |
| import org.apache.carbondata.core.datamap.TableDataMap; |
| import org.apache.carbondata.core.datastore.impl.FileFactory; |
| import org.apache.carbondata.core.indexstore.ExtendedBlocklet; |
| import org.apache.carbondata.core.indexstore.PartitionSpec; |
| import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable; |
| import org.apache.carbondata.core.mutate.CarbonUpdateUtil; |
| import org.apache.carbondata.core.mutate.SegmentUpdateDetails; |
| import org.apache.carbondata.core.mutate.UpdateVO; |
| import org.apache.carbondata.core.mutate.data.BlockMappingVO; |
| import org.apache.carbondata.core.profiler.ExplainCollector; |
| import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope; |
| import org.apache.carbondata.core.readcommitter.ReadCommittedScope; |
| import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; |
| import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; |
| import org.apache.carbondata.core.statusmanager.FileFormat; |
| import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; |
| import org.apache.carbondata.core.statusmanager.SegmentStatusManager; |
| import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; |
| import org.apache.carbondata.core.stream.StreamFile; |
| import org.apache.carbondata.core.stream.StreamPruner; |
| import org.apache.carbondata.core.util.CarbonProperties; |
| import org.apache.carbondata.core.util.CarbonUtil; |
| import org.apache.carbondata.hadoop.CarbonInputSplit; |
| |
| import org.apache.hadoop.fs.BlockLocation; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.lib.input.FileSplit; |
| import org.apache.log4j.Logger; |
| |
| /** |
| * InputFormat for reading carbondata files with table level metadata support, |
| * such as segment and explicit schema metadata. |
| * |
| * @param <T> |
| */ |
| public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { |
| |
| // comma separated list of input segment numbers |
| public static final String INPUT_SEGMENT_NUMBERS = |
| "mapreduce.input.carboninputformat.segmentnumbers"; |
| // comma separated list of input files |
| public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files"; |
| private static final Logger LOG = |
| LogServiceFactory.getLogService(CarbonTableInputFormat.class.getName()); |
| private static final String CARBON_TRANSACTIONAL_TABLE = |
| "mapreduce.input.carboninputformat.transactional"; |
| public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName"; |
| public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName"; |
| // a cache for carbon table, it will be used in task side |
| private CarbonTable carbonTable; |
| private ReadCommittedScope readCommittedScope; |
| |
| /** |
| * {@inheritDoc} |
| * Configurations FileInputFormat.INPUT_DIR |
| * are used to get table path to read. |
| * |
| * @param job |
| * @return List<InputSplit> list of CarbonInputSplit |
| * @throws IOException |
| */ |
| @Override |
| public List<InputSplit> getSplits(JobContext job) throws IOException { |
| AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); |
| carbonTable = getOrCreateCarbonTable(job.getConfiguration()); |
| if (null == carbonTable) { |
| throw new IOException("Missing/Corrupt schema file for table."); |
| } |
| this.readCommittedScope = getReadCommitted(job, identifier); |
| LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList(); |
| |
| SegmentUpdateStatusManager updateStatusManager = |
| new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails); |
| List<String> invalidSegmentIds = new ArrayList<>(); |
| List<Segment> streamSegments = null; |
| // get all valid segments and set them into the configuration |
| SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier, |
| readCommittedScope.getConfiguration()); |
| SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager |
| .getValidAndInvalidSegments(carbonTable.isChildTableForMV(), loadMetadataDetails, |
| this.readCommittedScope); |
| |
| List<Segment> validSegments = segments.getValidSegments(); |
| streamSegments = segments.getStreamSegments(); |
| streamSegments = getFilteredSegment(job, streamSegments, readCommittedScope); |
| if (validSegments.size() == 0) { |
| return getSplitsOfStreaming(job, streamSegments, carbonTable); |
| } |
| List<Segment> filteredSegmentToAccess = |
| getFilteredSegment(job, segments.getValidSegments(), readCommittedScope); |
| if (filteredSegmentToAccess.size() == 0) { |
| return getSplitsOfStreaming(job, streamSegments, carbonTable); |
| } else { |
| setSegmentsToAccess(job.getConfiguration(), filteredSegmentToAccess); |
| } |
| |
| // remove entry in the segment index if there are invalid segments |
| for (Segment segment : segments.getInvalidSegments()) { |
| invalidSegmentIds.add(segment.getSegmentNo()); |
| } |
| if (invalidSegmentIds.size() > 0) { |
| DataMapStoreManager.getInstance() |
| .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()), |
| invalidSegmentIds); |
| } |
| |
| List<Segment> segmentToAccess = |
| getFilteredSegment(job, segments.getValidSegments(), readCommittedScope); |
| |
| // process and resolve the expression |
| DataMapFilter dataMapFilter = getFilterPredicates(job.getConfiguration()); |
| |
| if (dataMapFilter != null) { |
| dataMapFilter.resolve(false); |
| } |
| |
| // do block filtering and get split |
| List<InputSplit> splits = getSplits( |
| job, dataMapFilter, segmentToAccess, |
| updateStatusManager, segments.getInvalidSegments()); |
| |
| // add all splits of streaming |
| List<InputSplit> splitsOfStreaming = getSplitsOfStreaming(job, streamSegments, carbonTable); |
| if (!splitsOfStreaming.isEmpty()) { |
| splits.addAll(splitsOfStreaming); |
| } |
| return splits; |
| } |
| |
| /** |
| * Method to check and refresh segment cache |
| * |
| * @param job |
| * @param carbonTable |
| * @param updateStatusManager |
| * @param filteredSegmentToAccess |
| * @throws IOException |
| */ |
| |
| /** |
| * Return segment list after filtering out valid segments and segments set by user by |
| * `INPUT_SEGMENT_NUMBERS` in job configuration |
| */ |
| private List<Segment> getFilteredSegment(JobContext job, List<Segment> validSegments, |
| ReadCommittedScope readCommittedScope) { |
| Segment[] segmentsToAccess = getSegmentsToAccess(job, readCommittedScope); |
| List<Segment> segmentToAccessSet = |
| new ArrayList<>(new HashSet<>(Arrays.asList(segmentsToAccess))); |
| List<Segment> filteredSegmentToAccess = new ArrayList<>(); |
| if (segmentsToAccess.length == 0 || segmentsToAccess[0].getSegmentNo().equalsIgnoreCase("*")) { |
| filteredSegmentToAccess.addAll(validSegments); |
| } else { |
| for (Segment validSegment : validSegments) { |
| int index = segmentToAccessSet.indexOf(validSegment); |
| if (index > -1) { |
| // In case of in progress reading segment, segment file name is set to the property itself |
| if (segmentToAccessSet.get(index).getSegmentFileName() != null |
| && validSegment.getSegmentFileName() == null) { |
| filteredSegmentToAccess.add(segmentToAccessSet.get(index)); |
| } else { |
| filteredSegmentToAccess.add(validSegment); |
| } |
| } |
| } |
| if (!filteredSegmentToAccess.containsAll(segmentToAccessSet)) { |
| List<Segment> filteredSegmentToAccessTemp = new ArrayList<>(filteredSegmentToAccess); |
| filteredSegmentToAccessTemp.removeAll(segmentToAccessSet); |
| LOG.info( |
| "Segments ignored are : " + Arrays.toString(filteredSegmentToAccessTemp.toArray())); |
| } |
| } |
| return filteredSegmentToAccess; |
| } |
| |
| public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments, |
| CarbonTable carbonTable) throws IOException { |
| return getSplitsOfStreaming(job, streamSegments, carbonTable, null); |
| } |
| |
| /** |
| * use file list in .carbonindex file to get the split of streaming. |
| */ |
| public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments, |
| CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) throws IOException { |
| List<InputSplit> splits = new ArrayList<InputSplit>(); |
| if (streamSegments != null && !streamSegments.isEmpty()) { |
| numStreamSegments = streamSegments.size(); |
| long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); |
| long maxSize = getMaxSplitSize(job); |
| if (filterResolverIntf == null) { |
| if (carbonTable != null) { |
| DataMapFilter filter = getFilterPredicates(job.getConfiguration()); |
| if (filter != null) { |
| filter.processFilterExpression(); |
| filterResolverIntf = filter.getResolver(); |
| } |
| } |
| } |
| StreamPruner streamPruner = new StreamPruner(carbonTable); |
| streamPruner.init(filterResolverIntf); |
| List<StreamFile> streamFiles = streamPruner.prune(streamSegments); |
| // record the hit information of the streaming files |
| this.hitedStreamFiles = streamFiles.size(); |
| this.numStreamFiles = streamPruner.getTotalFileNums(); |
| for (StreamFile streamFile : streamFiles) { |
| Path path = new Path(streamFile.getFilePath()); |
| long length = streamFile.getFileSize(); |
| if (length != 0) { |
| BlockLocation[] blkLocations; |
| FileSystem fs = FileFactory.getFileSystem(path); |
| FileStatus file = fs.getFileStatus(path); |
| blkLocations = fs.getFileBlockLocations(path, 0, length); |
| long blockSize = file.getBlockSize(); |
| long splitSize = computeSplitSize(blockSize, minSize, maxSize); |
| long bytesRemaining = length; |
| // split the stream file to small splits |
| // there is 10% slop to avoid to generate very small split in the end |
| while (((double) bytesRemaining) / splitSize > 1.1) { |
| int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); |
| splits.add(makeSplit(streamFile.getSegmentNo(), streamFile.getFilePath(), |
| length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), |
| blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1)); |
| bytesRemaining -= splitSize; |
| } |
| if (bytesRemaining != 0) { |
| int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); |
| splits.add(makeSplit(streamFile.getSegmentNo(), streamFile.getFilePath(), |
| length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), |
| blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1)); |
| } |
| } |
| } |
| } |
| return splits; |
| } |
| |
| protected FileSplit makeSplit(String segmentId, String filePath, long start, long length, |
| String[] hosts, String[] inMemoryHosts, FileFormat fileFormat) { |
| return new CarbonInputSplit(segmentId, filePath, start, length, hosts, inMemoryHosts, |
| fileFormat); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS |
| * are used to get table path to read. |
| * |
| * @return |
| * @throws IOException |
| */ |
| private List<InputSplit> getSplits(JobContext job, DataMapFilter expression, |
| List<Segment> validSegments, SegmentUpdateStatusManager updateStatusManager, |
| List<Segment> invalidSegments) throws IOException { |
| |
| List<String> segmentsToBeRefreshed = new ArrayList<>(); |
| if (!CarbonProperties.getInstance() |
| .isDistributedPruningEnabled(carbonTable.getDatabaseName(), carbonTable.getTableName())) { |
| // Clean the updated segments from memory if the update happens on segments |
| DataMapStoreManager.getInstance().refreshSegmentCacheIfRequired(carbonTable, |
| updateStatusManager, |
| validSegments); |
| } else { |
| segmentsToBeRefreshed = DataMapStoreManager.getInstance() |
| .getSegmentsToBeRefreshed(carbonTable, updateStatusManager, validSegments); |
| } |
| |
| numSegments = validSegments.size(); |
| List<InputSplit> result = new LinkedList<InputSplit>(); |
| UpdateVO invalidBlockVOForSegmentId = null; |
| boolean isIUDTable = false; |
| |
| isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); |
| |
| // for each segment fetch blocks matching filter in Driver BTree |
| List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment = |
| getDataBlocksOfSegment(job, carbonTable, expression, validSegments, |
| invalidSegments, segmentsToBeRefreshed); |
| numBlocks = dataBlocksOfSegment.size(); |
| for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) { |
| |
| // Get the UpdateVO for those tables on which IUD operations being performed. |
| if (isIUDTable) { |
| invalidBlockVOForSegmentId = |
| updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId()); |
| } |
| String[] deleteDeltaFilePath = null; |
| if (isIUDTable) { |
| // In case IUD is not performed in this table avoid searching for |
| // invalidated blocks. |
| if (CarbonUtil |
| .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getFilePath(), |
| invalidBlockVOForSegmentId, updateStatusManager)) { |
| continue; |
| } |
| // When iud is done then only get delete delta files for a block |
| try { |
| deleteDeltaFilePath = updateStatusManager |
| .getDeleteDeltaFilePath(inputSplit.getPath().toString(), inputSplit.getSegmentId()); |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| } |
| inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath); |
| result.add(inputSplit); |
| } |
| return result; |
| } |
| |
| /** |
| * return valid segment to access |
| */ |
| public Segment[] getSegmentsToAccess(JobContext job, ReadCommittedScope readCommittedScope) { |
| String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, ""); |
| if (segmentString.trim().isEmpty()) { |
| return new Segment[0]; |
| } |
| List<Segment> segments = Segment.toSegmentList(segmentString.split(","), readCommittedScope); |
| return segments.toArray(new Segment[segments.size()]); |
| } |
| |
| /** |
| * Get the row count of the Block and mapping of segment and Block count. |
| */ |
| public BlockMappingVO getBlockRowCount(Job job, CarbonTable table, |
| List<PartitionSpec> partitions, boolean isUpdateFlow) throws IOException { |
| // Normal query flow goes to CarbonInputFormat#getPrunedBlocklets and initialize the |
| // pruning info for table we queried. But here count star query without filter uses a different |
| // query plan, and no pruning info is initialized. When it calls default data map to |
| // prune(with a null filter), exception will occur during setting pruning info. |
| // Considering no useful information about block/blocklet pruning for such query |
| // (actually no pruning), so we disable explain collector here |
| ExplainCollector.remove(); |
| |
| AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier(); |
| |
| ReadCommittedScope readCommittedScope = getReadCommitted(job, identifier); |
| LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList(); |
| |
| SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager( |
| table, loadMetadataDetails); |
| SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments = |
| new SegmentStatusManager(identifier, readCommittedScope.getConfiguration()) |
| .getValidAndInvalidSegments(table.isChildTableForMV(), loadMetadataDetails, |
| readCommittedScope); |
| Map<String, Long> blockRowCountMapping = new HashMap<>(); |
| Map<String, Long> segmentAndBlockCountMapping = new HashMap<>(); |
| |
| // TODO: currently only batch segment is supported, add support for streaming table |
| List<Segment> filteredSegment = |
| getFilteredSegment(job, allSegments.getValidSegments(), readCommittedScope); |
| boolean isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); |
| /* In the select * flow, getSplits() method was clearing the segmentMap if, |
| segment needs refreshing. same thing need for select count(*) flow also. |
| For NonTransactional table, one of the reason for a segment refresh is below scenario. |
| SDK is written one set of files with UUID, with same UUID it can write again. |
| So, latest files content should reflect the new count by refreshing the segment */ |
| List<String> toBeCleanedSegments = new ArrayList<>(); |
| for (Segment eachSegment : filteredSegment) { |
| boolean refreshNeeded = DataMapStoreManager.getInstance() |
| .getTableSegmentRefresher(getOrCreateCarbonTable(job.getConfiguration())) |
| .isRefreshNeeded(eachSegment, |
| updateStatusManager.getInvalidTimestampRange(eachSegment.getSegmentNo())); |
| if (refreshNeeded) { |
| toBeCleanedSegments.add(eachSegment.getSegmentNo()); |
| } |
| } |
| for (Segment segment : allSegments.getInvalidSegments()) { |
| // remove entry in the segment index if there are invalid segments |
| toBeCleanedSegments.add(segment.getSegmentNo()); |
| } |
| if (toBeCleanedSegments.size() > 0) { |
| DataMapStoreManager.getInstance() |
| .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()), |
| toBeCleanedSegments); |
| } |
| if (isIUDTable || isUpdateFlow) { |
| Map<String, Long> blockletToRowCountMap = new HashMap<>(); |
| if (CarbonProperties.getInstance() |
| .isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) { |
| try { |
| List<ExtendedBlocklet> extendedBlocklets = |
| getDistributedBlockRowCount(table, partitions, filteredSegment, |
| allSegments.getInvalidSegments(), toBeCleanedSegments); |
| for (ExtendedBlocklet blocklet : extendedBlocklets) { |
| String filePath = blocklet.getFilePath().replace("\\", "/"); |
| String blockName = filePath.substring(filePath.lastIndexOf("/") + 1); |
| blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blockName, |
| blocklet.getRowCount()); |
| } |
| } catch (Exception e) { |
| // Check if fallback is disabled then directly throw exception otherwise try driver |
| // pruning. |
| if (CarbonProperties.getInstance().isFallBackDisabled()) { |
| throw e; |
| } |
| TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table); |
| blockletToRowCountMap |
| .putAll(defaultDataMap.getBlockRowCount(filteredSegment, partitions, defaultDataMap)); |
| } |
| } else { |
| TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table); |
| blockletToRowCountMap |
| .putAll(defaultDataMap.getBlockRowCount(filteredSegment, partitions, defaultDataMap)); |
| } |
| // key is the (segmentId","+blockletPath) and key is the row count of that blocklet |
| for (Map.Entry<String, Long> eachBlocklet : blockletToRowCountMap.entrySet()) { |
| String[] segmentIdAndPath = eachBlocklet.getKey().split(",", 2); |
| String segmentId = segmentIdAndPath[0]; |
| String blockName = segmentIdAndPath[1]; |
| |
| long rowCount = eachBlocklet.getValue(); |
| |
| String key = CarbonUpdateUtil.getSegmentBlockNameKey(segmentId, blockName); |
| |
| // if block is invalid then don't add the count |
| SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key); |
| |
| if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus())) { |
| Long blockCount = blockRowCountMapping.get(key); |
| if (blockCount == null) { |
| blockCount = 0L; |
| Long count = segmentAndBlockCountMapping.get(segmentId); |
| if (count == null) { |
| count = 0L; |
| } |
| segmentAndBlockCountMapping.put(segmentId, count + 1); |
| } |
| blockCount += rowCount; |
| blockRowCountMapping.put(key, blockCount); |
| } |
| } |
| } else { |
| long totalRowCount; |
| if (CarbonProperties.getInstance() |
| .isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) { |
| totalRowCount = |
| getDistributedCount(table, partitions, filteredSegment); |
| } else { |
| TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table); |
| totalRowCount = defaultDataMap.getRowCount(filteredSegment, partitions, defaultDataMap); |
| } |
| blockRowCountMapping.put(CarbonCommonConstantsInternal.ROW_COUNT, totalRowCount); |
| } |
| return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping); |
| } |
| |
| public ReadCommittedScope getReadCommitted(JobContext job, AbsoluteTableIdentifier identifier) |
| throws IOException { |
| if (readCommittedScope == null) { |
| ReadCommittedScope readCommittedScope; |
| if (job.getConfiguration().getBoolean(CARBON_TRANSACTIONAL_TABLE, true)) { |
| readCommittedScope = new TableStatusReadCommittedScope(identifier, job.getConfiguration()); |
| } else { |
| readCommittedScope = getReadCommittedScope(job.getConfiguration()); |
| if (readCommittedScope == null) { |
| readCommittedScope = |
| new LatestFilesReadCommittedScope(identifier.getTablePath(), job.getConfiguration()); |
| } |
| } |
| this.readCommittedScope = readCommittedScope; |
| } |
| return readCommittedScope; |
| } |
| } |