blob: bd0f5d1eb835ef10f9209cfab07b4e8656fc651b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.hadoop.api;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.carbondata.common.exceptions.DeprecatedFeatureException;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.index.IndexChooser;
import org.apache.carbondata.core.index.IndexFilter;
import org.apache.carbondata.core.index.IndexStoreManager;
import org.apache.carbondata.core.index.IndexUtil;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.index.TableIndex;
import org.apache.carbondata.core.index.dev.expr.IndexExprWrapper;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.mutate.data.BlockMappingVO;
import org.apache.carbondata.core.profiler.ExplainCollector;
import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.statusmanager.FileFormat;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.statusmanager.StageInputCollector;
import org.apache.carbondata.core.stream.StreamFile;
import org.apache.carbondata.core.stream.StreamPruner;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import com.google.common.collect.Sets;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.log4j.Logger;
/**
* InputFormat for reading carbondata files with table level metadata support,
* such as segment and explicit schema metadata.
*
* @param <T>
*/
public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
// comma separated list of input segment numbers
public static final String INPUT_SEGMENT_NUMBERS =
"mapreduce.input.carboninputformat.segmentnumbers";
// comma separated list of input files
public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
private static final Logger LOG =
LogServiceFactory.getLogService(CarbonTableInputFormat.class.getName());
protected static final String CARBON_TRANSACTIONAL_TABLE =
"mapreduce.input.carboninputformat.transactional";
public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
public static final String UPDATE_DELTA_VERSION = "updateDeltaVersion";
// a cache for carbon table, it will be used in task side
private CarbonTable carbonTable;
private ReadCommittedScope readCommittedScope;
/**
* get list of block/blocklet and make them to CarbonInputSplit
* @param job JobContext with Configuration
* @return list of CarbonInputSplit
* @throws IOException
*/
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
carbonTable = getOrCreateCarbonTable(job.getConfiguration());
if (null == carbonTable) {
throw new IOException("Missing/Corrupt schema file for table.");
}
// global dictionary is not supported since 2.0
if (carbonTable.getTableInfo().getFactTable().getTableProperties().containsKey(
CarbonCommonConstants.DICTIONARY_INCLUDE)) {
DeprecatedFeatureException.globalDictNotSupported();
}
List<InputSplit> splits = new LinkedList<>();
if (CarbonProperties.isQueryStageInputEnabled()) {
// If there are stage files, collect them and create splits so that they are
// included for the query
try {
List<InputSplit> stageInputSplits =
StageInputCollector.createInputSplits(carbonTable, job.getConfiguration());
splits.addAll(stageInputSplits);
} catch (ExecutionException | InterruptedException e) {
LOG.error("Failed to create input splits from stage files", e);
throw new IOException(e);
}
}
this.readCommittedScope = getReadCommitted(job, carbonTable.getAbsoluteTableIdentifier());
LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
String updateDeltaVersion = job.getConfiguration().get(UPDATE_DELTA_VERSION);
SegmentUpdateStatusManager updateStatusManager;
if (updateDeltaVersion != null) {
updateStatusManager =
new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails, updateDeltaVersion);
} else {
updateStatusManager =
new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails);
}
List<String> invalidSegmentIds = new ArrayList<>();
List<Segment> streamSegments = null;
// get all valid segments and set them into the configuration
SegmentStatusManager segmentStatusManager =
new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
readCommittedScope.getConfiguration());
SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
.getValidAndInvalidSegments(carbonTable.isMV(), loadMetadataDetails,
this.readCommittedScope);
if (getValidateSegmentsToAccess(job.getConfiguration())) {
List<Segment> validSegments = segments.getValidSegments();
streamSegments = segments.getStreamSegments();
streamSegments = getFilteredSegment(job, streamSegments, true, readCommittedScope);
if (validSegments.size() == 0) {
splits.addAll(getSplitsOfStreaming(job, streamSegments, carbonTable));
return splits;
}
List<Segment> filteredSegmentToAccess =
getFilteredSegment(job, segments.getValidSegments(), true, readCommittedScope);
if (filteredSegmentToAccess.size() == 0) {
splits.addAll(getSplitsOfStreaming(job, streamSegments, carbonTable));
return splits;
} else {
setSegmentsToAccess(job.getConfiguration(), filteredSegmentToAccess);
}
// remove entry in the segment index if there are invalid segments
for (Segment segment : segments.getInvalidSegments()) {
invalidSegmentIds.add(segment.getSegmentNo());
}
if (invalidSegmentIds.size() > 0) {
IndexStoreManager.getInstance()
.clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
invalidSegmentIds);
}
}
List<Segment> validAndInProgressSegments = new ArrayList<>(segments.getValidSegments());
// Add in progress segments also to filter it as in case of Secondary Index table load it loads
// data from in progress table.
validAndInProgressSegments.addAll(segments.getListOfInProgressSegments());
List<Segment> segmentToAccess =
getFilteredSegment(job, validAndInProgressSegments, false, readCommittedScope);
String segmentFileName = job.getConfiguration().get(CarbonCommonConstants.CURRENT_SEGMENTFILE);
if (segmentFileName != null) {
//per segment it has only one file("current.segment")
segmentToAccess.get(0).setSegmentFileName(segmentFileName + CarbonTablePath.SEGMENT_EXT);
}
// process and resolve the expression
IndexFilter indexFilter = getFilterPredicates(job.getConfiguration());
if (indexFilter != null) {
indexFilter.resolve(false);
}
// do block filtering and get split
List<InputSplit> batchSplits = getSplits(
job, indexFilter, segmentToAccess,
updateStatusManager, segments.getInvalidSegments());
splits.addAll(batchSplits);
// add all splits of streaming
List<InputSplit> splitsOfStreaming = getSplitsOfStreaming(job, streamSegments, carbonTable);
if (!splitsOfStreaming.isEmpty()) {
splits.addAll(splitsOfStreaming);
}
return splits;
}
/**
* Return segment list after filtering out valid segments and segments set by user by
* `INPUT_SEGMENT_NUMBERS` in job configuration
*/
private List<Segment> getFilteredSegment(JobContext job, List<Segment> validSegments,
boolean validationRequired, ReadCommittedScope readCommittedScope) {
Segment[] segmentsToAccess = getSegmentsToAccess(job, readCommittedScope);
if (segmentsToAccess.length == 0 || segmentsToAccess[0].getSegmentNo().equalsIgnoreCase("*")) {
return validSegments;
}
Map<String, Segment> segmentToAccessMap = Arrays.stream(segmentsToAccess)
.collect(Collectors.toMap(Segment::getSegmentNo, segment -> segment, (e1, e2) -> e1));
Map<String, Segment> filteredSegmentToAccess = new HashMap<>(segmentToAccessMap.size());
for (Segment validSegment : validSegments) {
String segmentNoOfValidSegment = validSegment.getSegmentNo();
if (segmentToAccessMap.containsKey(segmentNoOfValidSegment)) {
Segment segmentToAccess = segmentToAccessMap.get(segmentNoOfValidSegment);
if (segmentToAccess.getSegmentFileName() != null &&
validSegment.getSegmentFileName() == null) {
validSegment = segmentToAccess;
}
filteredSegmentToAccess.put(segmentNoOfValidSegment, validSegment);
}
}
if (!validationRequired && filteredSegmentToAccess.size() != segmentToAccessMap.size()) {
for (Segment segment : segmentToAccessMap.values()) {
if (!filteredSegmentToAccess.containsKey(segment.getSegmentNo())) {
filteredSegmentToAccess.put(segment.getSegmentNo(), segment);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Segments ignored are : " +
Arrays.toString(Sets.difference(new HashSet<>(filteredSegmentToAccess.values()),
new HashSet<>(segmentToAccessMap.values())).toArray()));
}
return new ArrayList<>(filteredSegmentToAccess.values());
}
public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,
CarbonTable carbonTable) throws IOException {
return getSplitsOfStreaming(job, streamSegments, carbonTable, null);
}
/**
* use file list in .carbonindex file to get the split of streaming.
*/
public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,
CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) throws IOException {
List<InputSplit> splits = new ArrayList<>();
if (streamSegments != null && !streamSegments.isEmpty()) {
numStreamSegments = streamSegments.size();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
if (filterResolverIntf == null) {
if (carbonTable != null) {
IndexFilter filter = getFilterPredicates(job.getConfiguration());
if (filter != null) {
filter.processFilterExpression();
filterResolverIntf = filter.getResolver();
}
}
}
StreamPruner streamPruner = new StreamPruner(carbonTable);
streamPruner.init(filterResolverIntf);
List<StreamFile> streamFiles = streamPruner.prune(streamSegments);
// record the hit information of the streaming files
this.hitStreamFiles = streamFiles.size();
this.numStreamFiles = streamPruner.getTotalFileNums();
for (StreamFile streamFile : streamFiles) {
Path path = new Path(streamFile.getFilePath());
long length = streamFile.getFileSize();
if (length != 0) {
BlockLocation[] blkLocations;
FileSystem fs = FileFactory.getFileSystem(path);
FileStatus file = fs.getFileStatus(path);
blkLocations = fs.getFileBlockLocations(path, 0, length);
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
// split the stream file to small splits
// there is 10% slop to avoid to generate very small split in the end
while (((double) bytesRemaining) / splitSize > 1.1) {
int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(makeSplit(streamFile.getSegmentNo(), streamFile.getFilePath(),
length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(makeSplit(streamFile.getSegmentNo(), streamFile.getFilePath(),
length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
}
}
}
}
return splits;
}
protected FileSplit makeSplit(String segmentId, String filePath, long start, long length,
String[] hosts, String[] inMemoryHosts, FileFormat fileFormat) {
return new CarbonInputSplit(segmentId, filePath, start, length, hosts, inMemoryHosts,
fileFormat);
}
/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS
* are used to get table path to read.
*
* @return
* @throws IOException
*/
private List<InputSplit> getSplits(JobContext job, IndexFilter expression,
List<Segment> validSegments, SegmentUpdateStatusManager updateStatusManager,
List<Segment> invalidSegments) throws IOException {
List<String> segmentsToBeRefreshed = new ArrayList<>();
if (!CarbonProperties.getInstance()
.isDistributedPruningEnabled(carbonTable.getDatabaseName(), carbonTable.getTableName())) {
// Clean the updated segments from memory if the update happens on segments
IndexStoreManager.getInstance().refreshSegmentCacheIfRequired(carbonTable,
updateStatusManager,
validSegments);
} else {
segmentsToBeRefreshed = IndexStoreManager.getInstance()
.getSegmentsToBeRefreshed(carbonTable, validSegments);
}
numSegments = validSegments.size();
List<InputSplit> result = new LinkedList<>();
UpdateVO invalidBlockVOForSegmentId = null;
boolean isIUDTable;
isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
// for each segment fetch blocks matching filter in Driver BTree
List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
getDataBlocksOfSegment(job, carbonTable, expression, validSegments,
invalidSegments, segmentsToBeRefreshed);
numBlocks = dataBlocksOfSegment.size();
for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) {
// Get the UpdateVO for those tables on which IUD operations being performed.
if (isIUDTable) {
invalidBlockVOForSegmentId = SegmentUpdateStatusManager
.getInvalidTimestampRange(inputSplit.getSegment().getLoadMetadataDetails());
}
String[] deleteDeltaFilePath = null;
if (isIUDTable) {
// In case IUD is not performed in this table avoid searching for
// invalidated blocks.
if (CarbonUtil
.isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getFilePath(),
invalidBlockVOForSegmentId, updateStatusManager)) {
continue;
}
// When iud is done then only get delete delta files for a block
try {
deleteDeltaFilePath = updateStatusManager
.getDeleteDeltaFilePath(inputSplit.getPath().toString(), inputSplit.getSegmentId());
} catch (Exception e) {
throw new IOException(e);
}
}
inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath);
result.add(inputSplit);
}
return result;
}
/**
* return valid segment to access
*/
public Segment[] getSegmentsToAccess(JobContext job, ReadCommittedScope readCommittedScope) {
String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
if (segmentString.trim().isEmpty()) {
return new Segment[0];
}
List<Segment> segments = Segment.toSegmentList(segmentString.split(","), readCommittedScope);
return segments.toArray(new Segment[segments.size()]);
}
/**
* Get the row count of the Block and mapping of segment and Block count.
*/
public BlockMappingVO getBlockRowCount(Job job, CarbonTable table,
List<PartitionSpec> partitions, boolean isUpdateFlow) throws IOException {
// Normal query flow goes to CarbonInputFormat#getPrunedBlocklets and initialize the
// pruning info for table we queried. But here count star query without filter uses a different
// query plan, and no pruning info is initialized. When it calls default index to
// prune(with a null filter), exception will occur during setting pruning info.
// Considering no useful information about block/blocklet pruning for such query
// (actually no pruning), so we disable explain collector here
ExplainCollector.remove();
AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
ReadCommittedScope readCommittedScope = getReadCommitted(job, identifier);
LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(
table, loadMetadataDetails);
SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
new SegmentStatusManager(identifier, readCommittedScope.getConfiguration())
.getValidAndInvalidSegments(table.isMV(), loadMetadataDetails,
readCommittedScope);
Map<String, Long> blockRowCountMapping = new HashMap<>();
Map<String, Long> segmentAndBlockCountMapping = new HashMap<>();
Map<String, String> blockToSegmentMapping = new HashMap<>();
// TODO: currently only batch segment is supported, add support for streaming table
List<Segment> filteredSegment =
getFilteredSegment(job, allSegments.getValidSegments(), false, readCommittedScope);
boolean isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
/* In the select * flow, getSplits() method was clearing the segmentMap if,
segment needs refreshing. same thing need for select count(*) flow also.
For NonTransactional table, one of the reason for a segment refresh is below scenario.
SDK is written one set of files with UUID, with same UUID it can write again.
So, latest files content should reflect the new count by refreshing the segment */
List<String> toBeCleanedSegments = new ArrayList<>();
for (Segment segment : filteredSegment) {
boolean refreshNeeded = IndexStoreManager.getInstance()
.getTableSegmentRefresher(getOrCreateCarbonTable(job.getConfiguration()))
.isRefreshNeeded(segment, SegmentUpdateStatusManager
.getInvalidTimestampRange(segment.getLoadMetadataDetails()));
if (refreshNeeded) {
toBeCleanedSegments.add(segment.getSegmentNo());
}
}
for (Segment segment : allSegments.getInvalidSegments()) {
// remove entry in the segment index if there are invalid segments
toBeCleanedSegments.add(segment.getSegmentNo());
}
if (toBeCleanedSegments.size() > 0) {
IndexStoreManager.getInstance()
.clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
toBeCleanedSegments);
}
IndexExprWrapper indexExprWrapper =
IndexChooser.getDefaultIndex(getOrCreateCarbonTable(job.getConfiguration()), null);
IndexUtil.loadIndexes(table, indexExprWrapper, filteredSegment);
if (isIUDTable || isUpdateFlow) {
Map<String, Long> blockletToRowCountMap = new HashMap<>();
if (CarbonProperties.getInstance()
.isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) {
try {
List<ExtendedBlocklet> extendedBlocklets =
getDistributedBlockRowCount(table, partitions, filteredSegment,
allSegments.getInvalidSegments(), toBeCleanedSegments);
for (ExtendedBlocklet blocklet : extendedBlocklets) {
String filePath = blocklet.getFilePath().replace("\\", "/");
String blockName = filePath.substring(filePath.lastIndexOf("/") + 1);
blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blockName,
blocklet.getRowCount());
}
} catch (Exception e) {
// Check if fallback is disabled then directly throw exception otherwise try driver
// pruning.
if (CarbonProperties.getInstance().isFallBackDisabled()) {
throw e;
}
TableIndex defaultIndex = IndexStoreManager.getInstance().getDefaultIndex(table);
blockletToRowCountMap
.putAll(defaultIndex.getBlockRowCount(filteredSegment, partitions, defaultIndex));
}
} else {
TableIndex defaultIndex = IndexStoreManager.getInstance().getDefaultIndex(table);
blockletToRowCountMap
.putAll(defaultIndex.getBlockRowCount(filteredSegment, partitions, defaultIndex));
}
// key is the (segmentId","+blockletPath) and key is the row count of that blocklet
for (Map.Entry<String, Long> eachBlocklet : blockletToRowCountMap.entrySet()) {
String[] segmentIdAndPath = eachBlocklet.getKey().split(",", 2);
String segmentId = segmentIdAndPath[0];
String blockName = segmentIdAndPath[1];
long rowCount = eachBlocklet.getValue();
String key = CarbonUpdateUtil
.getSegmentBlockNameKey(segmentId, blockName, table.isHivePartitionTable());
// if block is invalid then don't add the count
SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus())) {
Long blockCount = blockRowCountMapping.get(key);
if (blockCount == null) {
blockCount = 0L;
Long count = segmentAndBlockCountMapping.get(segmentId);
if (count == null) {
count = 0L;
}
segmentAndBlockCountMapping.put(segmentId, count + 1);
}
blockToSegmentMapping.put(key, segmentId);
blockCount += rowCount;
blockRowCountMapping.put(key, blockCount);
}
}
} else {
long totalRowCount;
if (CarbonProperties.getInstance()
.isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) {
totalRowCount =
getDistributedCount(table, partitions, filteredSegment);
} else {
TableIndex defaultIndex = IndexStoreManager.getInstance().getDefaultIndex(table);
totalRowCount = defaultIndex.getRowCount(filteredSegment, partitions, defaultIndex);
}
blockRowCountMapping.put(CarbonCommonConstantsInternal.ROW_COUNT, totalRowCount);
}
BlockMappingVO blockMappingVO =
new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
blockMappingVO.setBlockToSegmentMapping(blockToSegmentMapping);
return blockMappingVO;
}
public ReadCommittedScope getReadCommitted(JobContext job, AbsoluteTableIdentifier identifier)
throws IOException {
if (readCommittedScope == null) {
ReadCommittedScope readCommittedScope;
if (job.getConfiguration().getBoolean(CARBON_TRANSACTIONAL_TABLE, true)) {
readCommittedScope = new TableStatusReadCommittedScope(identifier, job.getConfiguration());
} else {
readCommittedScope = getReadCommittedScope(job.getConfiguration());
if (readCommittedScope == null) {
readCommittedScope =
new LatestFilesReadCommittedScope(identifier.getTablePath(), job.getConfiguration());
}
}
this.readCommittedScope = readCommittedScope;
}
return readCommittedScope;
}
public void setReadCommittedScope(ReadCommittedScope readCommittedScope) {
this.readCommittedScope = readCommittedScope;
}
}