[CARBONDATA-3889] Cleanup code for carbondata-hadoop module
Why is this PR needed?
need cleanup code for carbondata-hadoop module
What changes were proposed in this PR?
Cleanup code for carbondata-hadoop module
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #3827
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
index 5923ab7..15898d3 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
@@ -31,9 +31,7 @@
protected int rowCount = 0;
/**
- * This method will log query result count and querytime
- * @param recordCount
- * @param recorder
+ * This method will log query result count and query time
*/
public void logStatistics(int recordCount, QueryStatisticsRecorder recorder) {
// result size
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index 64901cf..f6a11a2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -139,8 +139,8 @@
}
getLocationIfNull();
out.writeInt(locations.length);
- for (int i = 0; i < locations.length; i++) {
- out.writeUTF(locations[i]);
+ for (String location : locations) {
+ out.writeUTF(location);
}
out.writeInt(fileFormat.ordinal());
}
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java
index 13ccaa9..12c03c0 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.hadoop;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
@@ -29,16 +30,14 @@
private static final long serialVersionUID = -4328676723039530713L;
- private Set<String> columns = new LinkedHashSet<>();
+ private final Set<String> columns = new LinkedHashSet<>();
public CarbonProjection() {
}
public CarbonProjection(String[] columnNames) {
Objects.requireNonNull(columnNames);
- for (String columnName : columnNames) {
- columns.add(columnName);
- }
+ columns.addAll(Arrays.asList(columnNames));
}
public void addColumn(String column) {
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 943bd76..2c5c821 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -43,13 +43,13 @@
*/
public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
- protected QueryModel queryModel;
+ protected final QueryModel queryModel;
- protected CarbonReadSupport<T> readSupport;
+ protected final CarbonReadSupport<T> readSupport;
protected CarbonIterator<Object[]> carbonIterator;
- protected QueryExecutor queryExecutor;
+ protected final QueryExecutor queryExecutor;
private InputMetricsStats inputMetricsStats;
/**
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 5568a29..9c599ef 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
@@ -28,7 +27,6 @@
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.index.IndexFilter;
import org.apache.carbondata.core.index.Segment;
@@ -94,12 +92,9 @@
}
/**
- * {@inheritDoc}
- * Configurations FileInputFormat.INPUT_DIR
- * are used to get table path to read.
- *
- * @param job
- * @return List<InputSplit> list of CarbonInputSplit
+ * get list of block/blocklet and make them to CarbonInputSplit
+ * @param job JobContext with Configuration
+ * @return list of CarbonInputSplit
* @throws IOException
*/
@Override
@@ -114,7 +109,7 @@
// check for externalTable segment (Segment_null)
// process and resolve the expression
- ReadCommittedScope readCommittedScope = null;
+ ReadCommittedScope readCommittedScope;
if (carbonTable.isTransactionalTable()) {
readCommittedScope = new LatestFilesReadCommittedScope(
identifier.getTablePath() + "/Fact/Part0/Segment_null/", job.getConfiguration());
@@ -131,7 +126,7 @@
IndexFilter filter = getFilterPredicates(job.getConfiguration());
// if external table Segments are found, add it to the List
- List<Segment> externalTableSegments = new ArrayList<Segment>();
+ List<Segment> externalTableSegments = new ArrayList<>();
Segment seg;
if (carbonTable.isTransactionalTable()) {
// SDK some cases write into the Segment Path instead of Table Path i.e. inside
@@ -145,9 +140,9 @@
for (LoadMetadataDetails load : loadMetadataDetails) {
seg = new Segment(load.getLoadName(), null, readCommittedScope);
if (fileLists != null) {
- for (int i = 0; i < fileLists.size(); i++) {
+ for (Object fileList : fileLists) {
String timestamp =
- CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileLists.get(i).toString());
+ CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileList.toString());
if (timestamp.equals(seg.getSegmentNo())) {
externalTableSegments.add(seg);
break;
@@ -170,7 +165,7 @@
// do block filtering and get split
splits = getSplits(job, filter, externalTableSegments);
} else {
- List<CarbonFile> carbonFiles = null;
+ List<CarbonFile> carbonFiles;
if (null != this.fileLists) {
carbonFiles = getAllCarbonDataFiles(this.fileLists);
} else {
@@ -191,13 +186,7 @@
info.setUseMinMaxForPruning(false);
splits.add(split);
}
- Collections.sort(splits, new Comparator<InputSplit>() {
- @Override
- public int compare(InputSplit o1, InputSplit o2) {
- return ((CarbonInputSplit) o1).getFilePath()
- .compareTo(((CarbonInputSplit) o2).getFilePath());
- }
- });
+ splits.sort(Comparator.comparing(o -> ((CarbonInputSplit) o).getFilePath()));
}
setAllColumnProjectionIfNotConfigured(job, carbonTable);
return splits;
@@ -215,12 +204,8 @@
private List<CarbonFile> getAllCarbonDataFiles(String tablePath) {
List<CarbonFile> carbonFiles;
try {
- carbonFiles = FileFactory.getCarbonFile(tablePath).listFiles(true, new CarbonFileFilter() {
- @Override
- public boolean accept(CarbonFile file) {
- return file.getName().endsWith(CarbonTablePath.CARBON_DATA_EXT);
- }
- });
+ carbonFiles = FileFactory.getCarbonFile(tablePath).listFiles(true,
+ file -> file.getName().endsWith(CarbonTablePath.CARBON_DATA_EXT));
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -228,10 +213,10 @@
}
private List<CarbonFile> getAllCarbonDataFiles(List fileLists) {
- List<CarbonFile> carbonFiles = new LinkedList<CarbonFile>();
+ List<CarbonFile> carbonFiles = new LinkedList<>();
try {
- for (int i = 0; i < fileLists.size(); i++) {
- carbonFiles.add(FileFactory.getCarbonFile(fileLists.get(i).toString()));
+ for (Object fileList : fileLists) {
+ carbonFiles.add(FileFactory.getCarbonFile(fileList.toString()));
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -247,20 +232,13 @@
* @return
* @throws IOException
*/
- private List<InputSplit> getSplits(
- JobContext job,
- IndexFilter indexFilter,
+ private List<InputSplit> getSplits(JobContext job, IndexFilter indexFilter,
List<Segment> validSegments) throws IOException {
-
numSegments = validSegments.size();
- List<InputSplit> result = new LinkedList<InputSplit>();
-
// for each segment fetch blocks matching filter in Driver BTree
- List<CarbonInputSplit> dataBlocksOfSegment =
- getDataBlocksOfSegment(job, carbonTable, indexFilter, validSegments,
- new ArrayList<Segment>(), new ArrayList<String>());
+ List<CarbonInputSplit> dataBlocksOfSegment = getDataBlocksOfSegment(job, carbonTable,
+ indexFilter, validSegments, new ArrayList<>(), new ArrayList<>());
numBlocks = dataBlocksOfSegment.size();
- result.addAll(dataBlocksOfSegment);
- return result;
+ return new LinkedList<>(dataBlocksOfSegment);
}
}
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 5c9e5e1..130e0d9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -126,7 +126,7 @@
protected int numSegments = 0;
protected int numStreamSegments = 0;
protected int numStreamFiles = 0;
- protected int hitedStreamFiles = 0;
+ protected int hitStreamFiles = 0;
protected int numBlocks = 0;
protected List fileLists = null;
@@ -144,8 +144,8 @@
return numStreamFiles;
}
- public int getHitedStreamFiles() {
- return hitedStreamFiles;
+ public int getHitStreamFiles() {
+ return hitStreamFiles;
}
public int getNumBlocks() {
@@ -259,24 +259,13 @@
if (projection == null || projection.isEmpty()) {
return;
}
- String[] allColumns = projection.getAllColumns();
- StringBuilder builder = new StringBuilder();
- for (String column : allColumns) {
- builder.append(column).append(",");
- }
- String columnString = builder.toString();
- columnString = columnString.substring(0, columnString.length() - 1);
- configuration.set(COLUMN_PROJECTION, columnString);
+ setColumnProjection(configuration, projection.getAllColumns());
}
public static String getColumnProjection(Configuration configuration) {
return configuration.get(COLUMN_PROJECTION);
}
- public static void setFgIndexPruning(Configuration configuration, boolean enable) {
- configuration.set(FG_INDEX_PRUNING, String.valueOf(enable));
- }
-
public static boolean isFgIndexPruningEnable(Configuration configuration) {
String enable = configuration.get(FG_INDEX_PRUNING);
@@ -390,12 +379,9 @@
}
/**
- * {@inheritDoc}
- * Configurations FileInputFormat.INPUT_DIR
- * are used to get table path to read.
- *
- * @param job
- * @return List<InputSplit> list of CarbonInputSplit
+ * get list of block/blocklet and make them to CarbonInputSplit
+ * @param job JobContext with Configuration
+ * @return list of CarbonInputSplit
* @throws IOException
*/
@Override
@@ -409,7 +395,7 @@
Long getDistributedCount(CarbonTable table,
List<PartitionSpec> partitionNames, List<Segment> validSegments) {
IndexInputFormat indexInputFormat =
- new IndexInputFormat(table, null, validSegments, new ArrayList<String>(),
+ new IndexInputFormat(table, null, validSegments, new ArrayList<>(),
partitionNames, false, null, false, false);
indexInputFormat.setIsWriteToFile(false);
try {
@@ -497,7 +483,7 @@
// matchedPartitions variable will be null in two cases as follows
// 1. the table is not a partition table
// 2. the table is a partition table, and all partitions are matched by query
- // for partition table, the task id of carbaondata file name is the partition id.
+ // for partition table, the task id of carbondata file name is the partition id.
// if this partition is not required, here will skip it.
resultFilteredBlocks.add(blocklet.getInputSplit());
}
@@ -512,11 +498,11 @@
* get number of block by counting distinct file path of blocklets
*/
private int getBlockCount(List<ExtendedBlocklet> blocklets) {
- Set<String> filepaths = new HashSet<>();
+ Set<String> filePaths = new HashSet<>();
for (ExtendedBlocklet blocklet: blocklets) {
- filepaths.add(blocklet.getPath());
+ filePaths.add(blocklet.getPath());
}
- return filepaths.size();
+ return filePaths.size();
}
/**
@@ -537,7 +523,7 @@
List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
// First prune using default index on driver side.
TableIndex defaultIndex = IndexStoreManager.getInstance().getDefaultIndex(carbonTable);
- List<ExtendedBlocklet> prunedBlocklets = null;
+ List<ExtendedBlocklet> prunedBlocklets;
// This is to log the event, so user will know what is happening by seeing logs.
LOG.info("Started block pruning ...");
boolean isDistributedPruningEnabled = CarbonProperties.getInstance()
@@ -573,7 +559,7 @@
IndexChooser chooser = new IndexChooser(getOrCreateCarbonTable(job.getConfiguration()));
- // Get the available CG indexs and prune further.
+ // Get the available CG indexes and prune further.
IndexExprWrapper cgIndexExprWrapper = chooser.chooseCGIndex(filter.getResolver());
if (cgIndexExprWrapper != null) {
@@ -586,7 +572,7 @@
if (distributedCG && indexJob != null) {
cgPrunedBlocklets = IndexUtil
.executeIndexJob(carbonTable, filter.getResolver(), indexJob, partitionsToPrune,
- segmentIds, invalidSegments, IndexLevel.CG, new ArrayList<String>());
+ segmentIds, invalidSegments, IndexLevel.CG, new ArrayList<>());
} else {
cgPrunedBlocklets = cgIndexExprWrapper.prune(segmentIds, partitionsToPrune);
}
@@ -620,10 +606,9 @@
// Prune segments from already pruned blocklets
IndexUtil.pruneSegments(segmentIds, prunedBlocklets);
// Prune segments from already pruned blocklets
- fgPrunedBlocklets = IndexUtil
- .executeIndexJob(carbonTable, filter.getResolver(), indexJob, partitionsToPrune,
- segmentIds, invalidSegments, fgIndexExprWrapper.getIndexLevel(),
- new ArrayList<String>());
+ fgPrunedBlocklets = IndexUtil.executeIndexJob(
+ carbonTable, filter.getResolver(), indexJob, partitionsToPrune, segmentIds,
+ invalidSegments, fgIndexExprWrapper.getIndexLevel(), new ArrayList<>());
// note that the 'fgPrunedBlocklets' has extra index related info compared with
// 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets'
prunedBlocklets =
@@ -641,7 +626,7 @@
private List<ExtendedBlocklet> intersectFilteredBlocklets(CarbonTable carbonTable,
List<ExtendedBlocklet> previousIndexPrunedBlocklets,
List<ExtendedBlocklet> otherIndexPrunedBlocklets) {
- List<ExtendedBlocklet> prunedBlocklets = null;
+ List<ExtendedBlocklet> prunedBlocklets;
if (BlockletIndexUtil.isCacheLevelBlock(carbonTable)) {
prunedBlocklets = new ArrayList<>();
for (ExtendedBlocklet otherBlocklet : otherIndexPrunedBlocklets) {
@@ -705,18 +690,15 @@
.dataConverter(getDataTypeConverter(configuration))
.build();
String readDeltaOnly = configuration.get(READ_ONLY_DELTA);
- if (readDeltaOnly != null && Boolean.parseBoolean(readDeltaOnly)) {
+ if (Boolean.parseBoolean(readDeltaOnly)) {
queryModel.setReadOnlyDelta(true);
}
return queryModel;
}
/**
- * This method will create an Implict Expression and set it as right child in the given
+ * This method will create an Implicit Expression and set it as right child in the given
* expression
- *
- * @param expression
- * @param inputSplit
*/
private void checkAndAddImplicitExpression(Expression expression, InputSplit inputSplit) {
if (inputSplit instanceof CarbonMultiBlockSplit) {
@@ -793,7 +775,7 @@
/**
* It is optional, if user does not set then it reads from store
*
- * @param configuration
+ * @param configuration hadoop configuration
* @param converterClass is the Data type converter for different computing engine
*/
public static void setDataTypeConverter(
@@ -825,11 +807,11 @@
public static String getDatabaseName(Configuration configuration)
throws InvalidConfigurationException {
- String databseName = configuration.get(DATABASE_NAME);
- if (null == databseName) {
+ String databaseName = configuration.get(DATABASE_NAME);
+ if (null == databaseName) {
throw new InvalidConfigurationException("Database name is not set.");
}
- return databseName;
+ return databaseName;
}
public static void setTableName(Configuration configuration, String tableName) {
@@ -850,8 +832,7 @@
/**
* Project all Columns for carbon reader
*
- * @return String araay of columnNames
- * @param carbonTable
+ * @return String array of columnNames
*/
public String[] projectAllColumns(CarbonTable carbonTable) {
List<ColumnSchema> colList = carbonTable.getTableInfo().getFactTable().getListOfColumns();
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 6aa3067..4fd754b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -59,7 +59,7 @@
import org.apache.log4j.Logger;
/**
- * Outputcommitter which manages the segments during loading.It commits segment information to the
+ * OutputCommitter which manages the segments during loading.It commits segment information to the
* tablestatus file upon success or fail.
*/
public class CarbonOutputCommitter extends FileOutputCommitter {
@@ -74,10 +74,7 @@
}
/**
- * Update the tablestatus with inprogress while setup the job.
- *
- * @param context
- * @throws IOException
+ * Update the tablestatus with in-progress while setup the job.
*/
@Override
public void setupJob(JobContext context) throws IOException {
@@ -104,9 +101,6 @@
/**
* Update the tablestatus as success after job is success
- *
- * @param context
- * @throws IOException
*/
@Override
public void commitJob(JobContext context) throws IOException {
@@ -230,7 +224,7 @@
}
}
String updateTime =
- context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null);
+ context.getConfiguration().get(CarbonTableOutputFormat.UPDATE_TIMESTAMP, null);
String segmentsToBeDeleted =
context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, "");
List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
@@ -327,10 +321,6 @@
/**
* Overwrite the partitions in case of overwrite query. It just updates the partition map files
* of all segment files.
- *
- * @param loadModel
- * @return
- * @throws IOException
*/
private String overwritePartitions(CarbonLoadModel loadModel, LoadMetadataDetails newMetaEntry,
String uuid) throws IOException {
@@ -345,19 +335,18 @@
new SegmentStatusManager(table.getAbsoluteTableIdentifier())
.getValidAndInvalidSegments(table.isMV()).getValidSegments();
String uniqueId = String.valueOf(System.currentTimeMillis());
- List<String> tobeUpdatedSegs = new ArrayList<>();
- List<String> tobeDeletedSegs = new ArrayList<>();
+ List<String> toBeUpdatedSegments = new ArrayList<>();
+ List<String> toBeDeletedSegments = new ArrayList<>();
// First drop the partitions from partition mapper files of each segment
for (Segment segment : validSegments) {
- new SegmentFileStore(table.getTablePath(), segment.getSegmentFileName())
- .dropPartitions(segment, partitionSpecs, uniqueId, tobeDeletedSegs, tobeUpdatedSegs);
-
+ new SegmentFileStore(table.getTablePath(), segment.getSegmentFileName()).dropPartitions(
+ segment, partitionSpecs, uniqueId, toBeDeletedSegments, toBeUpdatedSegments);
}
newMetaEntry.setUpdateStatusFileName(uniqueId);
// Commit the removed partitions in carbon store.
CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid,
- Segment.toSegmentList(tobeDeletedSegs, null),
- Segment.toSegmentList(tobeUpdatedSegs, null));
+ Segment.toSegmentList(toBeDeletedSegments, null),
+ Segment.toSegmentList(toBeUpdatedSegments, null));
return uniqueId;
}
return null;
@@ -375,10 +364,6 @@
/**
* Update the tablestatus as fail if any fail happens.And also clean up the temp folders if any
* are existed.
- *
- * @param context
- * @param state
- * @throws IOException
*/
@Override
public void abortJob(JobContext context, JobStatus.State state) throws IOException {
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 2d06222..bd0f5d1 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -101,12 +101,9 @@
private ReadCommittedScope readCommittedScope;
/**
- * {@inheritDoc}
- * Configurations FileInputFormat.INPUT_DIR
- * are used to get table path to read.
- *
- * @param job
- * @return List<InputSplit> list of CarbonInputSplit
+ * get list of block/blocklet and make them to CarbonInputSplit
+ * @param job JobContext with Configuration
+ * @return list of CarbonInputSplit
* @throws IOException
*/
@Override
@@ -218,16 +215,6 @@
}
/**
- * Method to check and refresh segment cache
- *
- * @param job
- * @param carbonTable
- * @param updateStatusManager
- * @param filteredSegmentToAccess
- * @throws IOException
- */
-
- /**
* Return segment list after filtering out valid segments and segments set by user by
* `INPUT_SEGMENT_NUMBERS` in job configuration
*/
@@ -276,7 +263,7 @@
*/
public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,
CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) throws IOException {
- List<InputSplit> splits = new ArrayList<InputSplit>();
+ List<InputSplit> splits = new ArrayList<>();
if (streamSegments != null && !streamSegments.isEmpty()) {
numStreamSegments = streamSegments.size();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
@@ -294,7 +281,7 @@
streamPruner.init(filterResolverIntf);
List<StreamFile> streamFiles = streamPruner.prune(streamSegments);
// record the hit information of the streaming files
- this.hitedStreamFiles = streamFiles.size();
+ this.hitStreamFiles = streamFiles.size();
this.numStreamFiles = streamPruner.getTotalFileNums();
for (StreamFile streamFile : streamFiles) {
Path path = new Path(streamFile.getFilePath());
@@ -358,7 +345,7 @@
}
numSegments = validSegments.size();
- List<InputSplit> result = new LinkedList<InputSplit>();
+ List<InputSplit> result = new LinkedList<>();
UpdateVO invalidBlockVOForSegmentId = null;
boolean isIUDTable;
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index ebac3d4..f52030c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -65,7 +65,7 @@
* creates new segment folder and manages the folder through tablestatus file.
* It also generate and writes dictionary data during load only if dictionary server is configured.
*/
-// TODO Move dictionary generater which is coded in spark to MR framework.
+// TODO Move dictionary generator which is coded in spark to MR framework.
public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, ObjectArrayWritable> {
protected static final String LOAD_MODEL = "mapreduce.carbontable.load.model";
@@ -98,7 +98,7 @@
* Set the update timestamp if user sets in case of update query. It needs to be updated
* in load status update time
*/
- public static final String UPADTE_TIMESTAMP = "mapreduce.carbontable.update.timestamp";
+ public static final String UPDATE_TIMESTAMP = "mapreduce.carbontable.update.timestamp";
/**
* During update query we first delete the old data and then add updated data to new segment, so
@@ -108,11 +108,6 @@
public static final String SEGMENTS_TO_BE_DELETED =
"mapreduce.carbontable.segments.to.be.removed";
- /**
- * It is used only to fire events in case of any child tables to be loaded.
- */
- public static final String OPERATION_CONTEXT = "mapreduce.carbontable.operation.context";
-
private static final Logger LOG =
LogServiceFactory.getLogService(CarbonTableOutputFormat.class.getName());
@@ -269,29 +264,25 @@
new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName(),
true));
// It should be started in new thread as the underlying iterator uses blocking queue.
- Future future = executorService.submit(new Thread() {
- @Override
- public void run() {
- ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo()
- .put("carbonConf", taskAttemptContext.getConfiguration());
- try {
- dataLoadExecutor
- .execute(loadModel, tempStoreLocations, iterators);
- } catch (Exception e) {
- executorService.shutdownNow();
- for (CarbonOutputIteratorWrapper iterator : iterators) {
- iterator.closeWriter(true);
- }
- try {
- dataLoadExecutor.close();
- } catch (Exception ex) {
- // As already exception happened before close() send that exception.
- throw new RuntimeException(e);
- }
- throw new RuntimeException(e);
- } finally {
- ThreadLocalSessionInfo.unsetAll();
+ Future future = executorService.submit(() -> {
+ ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo()
+ .put("carbonConf", taskAttemptContext.getConfiguration());
+ try {
+ dataLoadExecutor.execute(loadModel, tempStoreLocations, iterators);
+ } catch (Exception e) {
+ executorService.shutdownNow();
+ for (CarbonOutputIteratorWrapper iterator : iterators) {
+ iterator.closeWriter(true);
}
+ try {
+ dataLoadExecutor.close();
+ } catch (Exception ex) {
+ // As already exception happened before close() send that exception.
+ throw new RuntimeException(e);
+ }
+ throw new RuntimeException(e);
+ } finally {
+ ThreadLocalSessionInfo.unsetAll();
}
});
@@ -433,15 +424,15 @@
public static class CarbonRecordWriter extends RecordWriter<NullWritable, ObjectArrayWritable> {
- private CarbonOutputIteratorWrapper iteratorWrapper;
+ private final CarbonOutputIteratorWrapper iteratorWrapper;
- private DataLoadExecutor dataLoadExecutor;
+ private final DataLoadExecutor dataLoadExecutor;
- private CarbonLoadModel loadModel;
+ private final CarbonLoadModel loadModel;
- private ExecutorService executorService;
+ private final ExecutorService executorService;
- private Future future;
+ private final Future future;
private boolean isClosed;
@@ -526,11 +517,11 @@
and handles the load balancing of the write rows in round robin. */
public static class CarbonMultiRecordWriter extends CarbonRecordWriter {
- private CarbonOutputIteratorWrapper[] iterators;
+ private final CarbonOutputIteratorWrapper[] iterators;
// keep counts of number of writes called
// and it is used to load balance each write call to one iterator.
- private AtomicLong counter;
+ private final AtomicLong counter;
CarbonMultiRecordWriter(CarbonOutputIteratorWrapper[] iterators,
DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future,
@@ -551,9 +542,9 @@
@Override
public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException {
- for (int i = 0; i < iterators.length; i++) {
- synchronized (iterators[i]) {
- iterators[i].closeWriter(false);
+ for (CarbonOutputIteratorWrapper iterator : iterators) {
+ synchronized (iterator) {
+ iterator.closeWriter(false);
}
}
super.close(taskAttemptContext);
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java
index df37e09..2a6e54f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java
@@ -47,7 +47,7 @@
public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
public static final String STREAM_RECORD_READER_INSTANCE =
"org.apache.carbondata.stream.CarbonStreamRecordReader";
- // return raw row for handoff
+ // return raw row for hand off
private boolean useRawRow = false;
public void setUseRawRow(boolean useRawRow) {
@@ -114,7 +114,7 @@
for (int i = 0; i < dimension.getNumberOfChild(); i++) {
CarbonDimension child = dimension.getListOfChildDimensions().get(i);
DataType dataType = child.getDataType();
- GenericQueryType queryType = null;
+ GenericQueryType queryType;
if (DataTypes.isArrayType(dataType)) {
queryType =
new ArrayQueryType(child.getColName(), dimension.getColName(), ++parentColumnIndex);
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java
index c060535..29170fa 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java
@@ -33,9 +33,9 @@
}
- public static Object getInstanceWithReflection(Constructor cons, Object... initargs) throws
+ public static Object getInstanceWithReflection(Constructor cons, Object... initArgs) throws
IllegalAccessException,
InvocationTargetException, InstantiationException {
- return cons.newInstance(initargs);
+ return cons.newInstance(initArgs);
}
}
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java
index be8bc5a..6d91e78 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java
@@ -41,10 +41,10 @@
private final long limitStart;
private final long limitEnd;
private boolean isAlreadySync = false;
- private Compressor compressor;
+ private final Compressor compressor;
private int rowNums = 0;
private int rowIndex = 0;
- private boolean isHeaderPresent;
+ private final boolean isHeaderPresent;
public StreamBlockletReader(byte[] syncMarker, InputStream in, long limit,
boolean isHeaderPresent, String compressorName) {
@@ -175,7 +175,7 @@
int ch4 = in.read();
if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException();
pos += 4;
- return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4);
}
/**
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
index 52b10dc..14637ee 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
@@ -113,8 +113,8 @@
// empty project, null filter
protected boolean skipScanData;
- // return raw row for handoff
- private boolean useRawRow = false;
+ // return raw row for hand off
+ private final boolean useRawRow;
public StreamRecordReader(QueryModel mdl, boolean useRawRow) {
this.model = mdl;
@@ -137,7 +137,7 @@
// metadata
hadoopConf = context.getConfiguration();
if (model == null) {
- CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+ CarbonTableInputFormat<Object> format = new CarbonTableInputFormat<>();
model = format.createQueryModel(split, context);
}
carbonTable = model.getTable();
@@ -169,19 +169,19 @@
projection = model.getProjectionColumns();
isRequired = new boolean[storageColumns.length];
- boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
- boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
+ boolean[] isFilterDimensions = model.getIsFilterDimensions();
+ boolean[] isFilterMeasures = model.getIsFilterMeasures();
isFilterRequired = new boolean[storageColumns.length];
filterMap = new int[storageColumns.length];
for (int i = 0; i < storageColumns.length; i++) {
if (storageColumns[i].isDimension()) {
- if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
+ if (isFilterDimensions[storageColumns[i].getOrdinal()]) {
isRequired[i] = true;
isFilterRequired[i] = true;
filterMap[i] = storageColumns[i].getOrdinal();
}
} else {
- if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
+ if (isFilterMeasures[storageColumns[i].getOrdinal()]) {
isRequired[i] = true;
isFilterRequired[i] = true;
filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal();
@@ -279,7 +279,7 @@
scanMore = false;
} else {
if (useRawRow) {
- // read raw row for streaming handoff which does not require decode raw row
+ // read raw row for streaming hand off which does not require decode raw row
readRawRowFromStream();
} else {
readRowFromStream();
@@ -346,11 +346,7 @@
BitSet bitSet = filter
.isScanRequired(minMaxIndex.getMaxValues(), minMaxIndex.getMinValues(),
minMaxIndex.getIsMinMaxSet());
- if (bitSet.isEmpty()) {
- return false;
- } else {
- return true;
- }
+ return !bitSet.isEmpty();
}
}
return true;
@@ -375,7 +371,7 @@
}
} else {
if (isNoDictColumn[colCount]) {
- int v = 0;
+ int v;
if (dimensionsIsVarcharTypeMap[colCount]) {
v = input.readInt();
} else {
@@ -554,7 +550,7 @@
outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
} else {
if (isNoDictColumn[colCount]) {
- int v = 0;
+ int v;
if (dimensionsIsVarcharTypeMap[colCount]) {
v = input.readInt();
} else {
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index 2abfe7c..7c1d04f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -43,10 +43,10 @@
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
-import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.TableSchema;
@@ -86,9 +86,9 @@
private static final Logger LOG =
LogServiceFactory.getLogService(StoreCreator.class.getCanonicalName());
- private AbsoluteTableIdentifier absoluteTableIdentifier;
- private String storePath = null;
- private String csvPath;
+ private final AbsoluteTableIdentifier absoluteTableIdentifier;
+ private final String storePath;
+ private final String csvPath;
private List<String> sortColumns = new ArrayList<>();
public StoreCreator(String storePath, String csvPath) {
@@ -124,7 +124,7 @@
loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
loadModel.setFactFilePath(factFilePath);
- loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
+ loadModel.setLoadMetadataDetails(new ArrayList<>());
loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
loadModel.setDateFormat(null);
loadModel.setCarbonTransactionalTable(table.isTransactionalTable());
@@ -198,103 +198,22 @@
tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
TableSchema tableSchema = new TableSchema();
tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
- List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
+ List<ColumnSchema> columnSchemas = new ArrayList<>();
ArrayList<Encoding> encodings = new ArrayList<>();
int schemaOrdinal = 0;
- ColumnSchema id = new ColumnSchema();
- id.setColumnName("id");
- id.setDataType(DataTypes.INT);
- id.setEncodingList(encodings);
- id.setColumnUniqueId(UUID.randomUUID().toString());
- id.setColumnReferenceId(id.getColumnUniqueId());
- id.setDimensionColumn(true);
- id.setSchemaOrdinal(schemaOrdinal++);
- if (sortColumns.contains(id.getColumnName())) {
- id.setSortColumn(true);
- }
- columnSchemas.add(id);
-
- ColumnSchema date = new ColumnSchema();
- date.setColumnName("date");
- date.setDataType(DataTypes.STRING);
- date.setEncodingList(encodings);
- date.setColumnUniqueId(UUID.randomUUID().toString());
- date.setDimensionColumn(true);
- date.setColumnReferenceId(date.getColumnUniqueId());
- date.setSchemaOrdinal(schemaOrdinal++);
- if (sortColumns.contains(date.getColumnName())) {
- date.setSortColumn(true);
- }
- columnSchemas.add(date);
-
- ColumnSchema country = new ColumnSchema();
- country.setColumnName("country");
- country.setDataType(DataTypes.STRING);
- country.setEncodingList(encodings);
- country.setColumnUniqueId(UUID.randomUUID().toString());
- country.setDimensionColumn(true);
- country.setSortColumn(true);
- country.setSchemaOrdinal(schemaOrdinal++);
- if (sortColumns.contains(country.getColumnName())) {
- country.setSortColumn(true);
- }
- country.setColumnReferenceId(country.getColumnUniqueId());
- columnSchemas.add(country);
-
- ColumnSchema name = new ColumnSchema();
- name.setColumnName("name");
- name.setDataType(DataTypes.STRING);
- name.setEncodingList(encodings);
- name.setColumnUniqueId(UUID.randomUUID().toString());
- name.setDimensionColumn(true);
- name.setSchemaOrdinal(schemaOrdinal++);
- if (sortColumns.contains(name.getColumnName())) {
- name.setSortColumn(true);
- }
- name.setColumnReferenceId(name.getColumnUniqueId());
- columnSchemas.add(name);
-
- ColumnSchema phonetype = new ColumnSchema();
- phonetype.setColumnName("phonetype");
- phonetype.setDataType(DataTypes.STRING);
- phonetype.setEncodingList(encodings);
- phonetype.setColumnUniqueId(UUID.randomUUID().toString());
- phonetype.setDimensionColumn(true);
- phonetype.setSchemaOrdinal(schemaOrdinal++);
- if (sortColumns.contains(phonetype.getColumnName())) {
- phonetype.setSortColumn(true);
- }
- phonetype.setColumnReferenceId(phonetype.getColumnUniqueId());
- columnSchemas.add(phonetype);
-
- ColumnSchema serialname = new ColumnSchema();
- serialname.setColumnName("serialname");
- serialname.setDataType(DataTypes.STRING);
- serialname.setEncodingList(encodings);
- serialname.setColumnUniqueId(UUID.randomUUID().toString());
- serialname.setDimensionColumn(true);
- serialname.setSchemaOrdinal(schemaOrdinal++);
- if (sortColumns.contains(serialname.getColumnName())) {
- serialname.setSortColumn(true);
- }
- serialname.setColumnReferenceId(serialname.getColumnUniqueId());
- columnSchemas.add(serialname);
- ColumnSchema salary = new ColumnSchema();
- salary.setColumnName("salary");
- salary.setDataType(DataTypes.INT);
- salary.setEncodingList(new ArrayList<Encoding>());
- salary.setColumnUniqueId(UUID.randomUUID().toString());
- salary.setDimensionColumn(false);
- salary.setColumnReferenceId(salary.getColumnUniqueId());
- salary.setSchemaOrdinal(schemaOrdinal++);
- columnSchemas.add(salary);
-
+ addColumn(columnSchemas, encodings, schemaOrdinal++, "id", DataTypes.INT, true);
+ addColumn(columnSchemas, encodings, schemaOrdinal++, "date", DataTypes.STRING, true);
+ addColumn(columnSchemas, encodings, schemaOrdinal++, "country", DataTypes.STRING, true);
+ addColumn(columnSchemas, encodings, schemaOrdinal++, "name", DataTypes.STRING, true);
+ addColumn(columnSchemas, encodings, schemaOrdinal++, "phonetype", DataTypes.STRING, true);
+ addColumn(columnSchemas, encodings, schemaOrdinal++, "serialname", DataTypes.STRING, true);
+ addColumn(columnSchemas, encodings, schemaOrdinal, "salary", DataTypes.INT, false);
// rearrange the column schema based on the sort order, if sort columns exists
List<ColumnSchema> columnSchemas1 = reArrangeColumnSchema(columnSchemas);
tableSchema.setListOfColumns(columnSchemas1);
- SchemaEvolution schemaEvol = new SchemaEvolution();
- schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>());
- tableSchema.setSchemaEvolution(schemaEvol);
+ SchemaEvolution schemaEvolution = new SchemaEvolution();
+ schemaEvolution.setSchemaEvolutionEntryList(new ArrayList<>());
+ tableSchema.setSchemaEvolution(schemaEvolution);
tableSchema.setTableId(UUID.randomUUID().toString());
tableInfo.setTableUniqueName(
identifier.getCarbonTableIdentifier().getTableUniqueName()
@@ -328,6 +247,22 @@
return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName());
}
+ private void addColumn(List<ColumnSchema> columnSchemas, ArrayList<Encoding> encodings,
+ int schemaOrdinal, String name2, DataType dataType, boolean isDimensionColumn) {
+ ColumnSchema name = new ColumnSchema();
+ name.setColumnName(name2);
+ name.setDataType(dataType);
+ name.setEncodingList(encodings);
+ name.setColumnUniqueId(UUID.randomUUID().toString());
+ name.setColumnReferenceId(name.getColumnUniqueId());
+ name.setDimensionColumn(isDimensionColumn);
+ name.setSchemaOrdinal(schemaOrdinal);
+ if (sortColumns.contains(name.getColumnName())) {
+ name.setSortColumn(true);
+ }
+ columnSchemas.add(name);
+ }
+
private List<ColumnSchema> reArrangeColumnSchema(List<ColumnSchema> columnSchemas) {
List<ColumnSchema> newColumnSchema = new ArrayList<>(columnSchemas.size());
// add sort columns first
@@ -357,10 +292,6 @@
/**
* Execute graph which will further load data
- *
- * @param loadModel
- * @param storeLocation
- * @throws Exception
*/
public static void loadData(CarbonLoadModel loadModel, String storeLocation)
throws Exception {
@@ -425,7 +356,7 @@
writeLoadMetadata(
loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
- new ArrayList<LoadMetadataDetails>());
+ new ArrayList<>());
}
public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
@@ -460,13 +391,8 @@
writeOperation.setFailed();
throw ioe;
} finally {
- try {
- if (null != brWriter) {
- brWriter.flush();
- }
- } catch (Exception e) {
- throw e;
-
+ if (null != brWriter) {
+ brWriter.flush();
}
CarbonUtil.closeStreams(brWriter);
@@ -477,11 +403,7 @@
public static String readCurrentTime() {
SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
- String date = null;
-
- date = sdf.format(new Date());
-
- return date;
+ return sdf.format(new Date());
}
public static void main(String[] args) throws Exception {
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 9916a18..da47981 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -21,11 +21,8 @@
import java.text.SimpleDateFormat;
import java.util.Locale;
-import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.index.IndexUtil;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
import org.apache.hadoop.conf.Configuration;
@@ -33,31 +30,11 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.log4j.Logger;
/**
* Utility class
*/
public class CarbonInputFormatUtil {
-
- /**
- * Attribute for Carbon LOGGER.
- */
- private static final Logger LOGGER =
- LogServiceFactory.getLogService(CarbonProperties.class.getName());
-
- public static <V> CarbonFileInputFormat<V> createCarbonFileInputFormat(
- AbsoluteTableIdentifier identifier, Job job) throws IOException {
- CarbonFileInputFormat<V> carbonInputFormat = new CarbonFileInputFormat<V>();
- CarbonTableInputFormat.setDatabaseName(job.getConfiguration(),
- identifier.getCarbonTableIdentifier().getDatabaseName());
- CarbonTableInputFormat
- .setTableName(job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
- FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
- setIndexJobIfConfigured(job.getConfiguration());
- return carbonInputFormat;
- }
-
public static <V> CarbonTableInputFormat<V> createCarbonInputFormat(
AbsoluteTableIdentifier identifier,
Job job) throws IOException {
@@ -73,9 +50,6 @@
/**
* This method set IndexJob if configured
- *
- * @param conf
- * @throws IOException
*/
public static void setIndexJobIfConfigured(Configuration conf) throws IOException {
String className = "org.apache.carbondata.indexserver.EmbeddedIndexJob";
@@ -87,8 +61,7 @@
}
public static JobID getJobId(java.util.Date date, int batch) {
- String jobtrackerID = createJobTrackerID(date);
- return new JobID(jobtrackerID, batch);
+ String jobTrackerID = createJobTrackerID(date);
+ return new JobID(jobTrackerID, batch);
}
-
}
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java
index 72a8d09..d21a25a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java
@@ -51,7 +51,7 @@
@Override
public String[] getLocations() {
- Set<String> locations = new HashSet<String>();
+ Set<String> locations = new HashSet<>();
for (CarbonInputSplit splitInfo : carbonBlockInfoList) {
try {
locations.addAll(Arrays.asList(splitInfo.getLocations()));
@@ -90,55 +90,47 @@
/**
* Finding which node has the maximum number of blocks for it.
- *
- * @param splitList
- * @return
*/
public static List<String> maxNoNodes(List<CarbonInputSplit> splitList) {
boolean useIndex = true;
- Integer maxOccurence = 0;
+ Integer maxOccurrence = 0;
String maxNode = null;
- Map<String, Integer> nodeAndOccurenceMapping = new TreeMap<>();
+ Map<String, Integer> nodeAndOccurrenceMapping = new TreeMap<>();
- // populate the map of node and number of occurences of that node.
+ // populate the map of node and number of occurrences of that node.
for (CarbonInputSplit split : splitList) {
try {
for (String node : split.getLocations()) {
- Integer nodeOccurence = nodeAndOccurenceMapping.get(node);
- if (null == nodeOccurence) {
- nodeAndOccurenceMapping.put(node, 1);
- } else {
- nodeOccurence++;
- }
+ nodeAndOccurrenceMapping.putIfAbsent(node, 1);
}
} catch (IOException e) {
throw new RuntimeException("Fail to get location of split: " + split, e);
}
}
- Integer previousValueOccurence = null;
+ Integer previousValueOccurrence = null;
- // check which node is occured maximum times.
- for (Map.Entry<String, Integer> entry : nodeAndOccurenceMapping.entrySet()) {
+ // check which node is occurred maximum times.
+ for (Map.Entry<String, Integer> entry : nodeAndOccurrenceMapping.entrySet()) {
// finding the maximum node.
- if (entry.getValue() > maxOccurence) {
- maxOccurence = entry.getValue();
+ if (entry.getValue() > maxOccurrence) {
+ maxOccurrence = entry.getValue();
maxNode = entry.getKey();
}
- // first time scenario. initialzing the previous value.
- if (null == previousValueOccurence) {
- previousValueOccurence = entry.getValue();
+ // first time scenario. initializing the previous value.
+ if (null == previousValueOccurrence) {
+ previousValueOccurrence = entry.getValue();
} else {
// for the case where all the nodes have same number of blocks then
// we need to return complete list instead of max node.
- if (!Objects.equals(previousValueOccurence, entry.getValue())) {
+ if (!Objects.equals(previousValueOccurrence, entry.getValue())) {
useIndex = false;
}
}
}
- // if all the nodes have equal occurence then returning the complete key set.
+ // if all the nodes have equal occurrence then returning the complete key set.
if (useIndex) {
- return new ArrayList<>(nodeAndOccurenceMapping.keySet());
+ return new ArrayList<>(nodeAndOccurrenceMapping.keySet());
}
// if any max node is found then returning the max node.
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
index 54f4bf7..491094d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
@@ -69,10 +69,10 @@
private AbstractDetailQueryResultIterator iterator;
- private QueryModel queryModel;
+ private final QueryModel queryModel;
//This holds mapping of fetch index with respect to project col index.
// it is used when same col is used in projection many times.So need to fetch only that col.
- private List<Integer> projectionMapping = new ArrayList<>();
+ private final List<Integer> projectionMapping = new ArrayList<>();
public CarbonVectorizedRecordReader(QueryModel queryModel) {
this.queryModel = queryModel;
@@ -171,16 +171,16 @@
}
CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
- Map<String, Integer> colmap = new HashMap<>();
+ Map<String, Integer> columnMap = new HashMap<>();
for (int i = 0; i < fields.length; i++) {
vectors[i] = new CarbonColumnVectorImpl(
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
fields[i].getDataType());
- if (colmap.containsKey(fields[i].getFieldName())) {
- int reusedIndex = colmap.get(fields[i].getFieldName());
+ if (columnMap.containsKey(fields[i].getFieldName())) {
+ int reusedIndex = columnMap.get(fields[i].getFieldName());
projectionMapping.add(reusedIndex);
} else {
- colmap.put(fields[i].getFieldName(), i);
+ columnMap.put(fields[i].getFieldName(), i);
projectionMapping.add(i);
}
}
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 5424b66..4d1f3a3 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -193,7 +193,7 @@
logInfo(
s"""
| Identified no.of.streaming splits/tasks: ${ streamPartitions.size },
- | no.of.streaming files: ${format.getHitedStreamFiles},
+ | no.of.streaming files: ${format.getHitStreamFiles},
| no.of.total streaming files: ${format.getNumStreamFiles},
| no.of.total streaming segement: ${format.getNumStreamSegments}
""".stripMargin)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index d9c2633..e25c6e1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -174,7 +174,7 @@
// in load status update time
val updateTimeStamp = options.get("updatetimestamp")
if (updateTimeStamp.isDefined) {
- conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
+ conf.set(CarbonTableOutputFormat.UPDATE_TIMESTAMP, updateTimeStamp.get)
}
conf.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
CarbonTableOutputFormat.setLoadModel(conf, model)