[CARBONDATA-3889] Cleanup code for carbondata-hadoop module

Why is this PR needed?
need cleanup code for carbondata-hadoop module

What changes were proposed in this PR?
Cleanup code for carbondata-hadoop module

Does this PR introduce any user interface change?
No
Is any new testcase added?
No

This closes #3827
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
index 5923ab7..15898d3 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
@@ -31,9 +31,7 @@
   protected int rowCount = 0;
 
   /**
-   * This method will log query result count and querytime
-   * @param recordCount
-   * @param recorder
+   * This method will log query result count and query time
    */
   public void logStatistics(int recordCount, QueryStatisticsRecorder recorder) {
     // result size
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index 64901cf..f6a11a2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -139,8 +139,8 @@
     }
     getLocationIfNull();
     out.writeInt(locations.length);
-    for (int i = 0; i < locations.length; i++) {
-      out.writeUTF(locations[i]);
+    for (String location : locations) {
+      out.writeUTF(location);
     }
     out.writeInt(fileFormat.ordinal());
   }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java
index 13ccaa9..12c03c0 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.hadoop;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.LinkedHashSet;
 import java.util.Objects;
 import java.util.Set;
@@ -29,16 +30,14 @@
 
   private static final long serialVersionUID = -4328676723039530713L;
 
-  private Set<String> columns = new LinkedHashSet<>();
+  private final Set<String> columns = new LinkedHashSet<>();
 
   public CarbonProjection() {
   }
 
   public CarbonProjection(String[] columnNames) {
     Objects.requireNonNull(columnNames);
-    for (String columnName : columnNames) {
-      columns.add(columnName);
-    }
+    columns.addAll(Arrays.asList(columnNames));
   }
 
   public void addColumn(String column) {
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 943bd76..2c5c821 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -43,13 +43,13 @@
  */
 public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
 
-  protected QueryModel queryModel;
+  protected final QueryModel queryModel;
 
-  protected CarbonReadSupport<T> readSupport;
+  protected final CarbonReadSupport<T> readSupport;
 
   protected CarbonIterator<Object[]> carbonIterator;
 
-  protected QueryExecutor queryExecutor;
+  protected final QueryExecutor queryExecutor;
   private InputMetricsStats inputMetricsStats;
 
   /**
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 5568a29..9c599ef 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -20,7 +20,6 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
@@ -28,7 +27,6 @@
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.index.IndexFilter;
 import org.apache.carbondata.core.index.Segment;
@@ -94,12 +92,9 @@
   }
 
   /**
-   * {@inheritDoc}
-   * Configurations FileInputFormat.INPUT_DIR
-   * are used to get table path to read.
-   *
-   * @param job
-   * @return List<InputSplit> list of CarbonInputSplit
+   * get list of block/blocklet and make them to CarbonInputSplit
+   * @param job JobContext with Configuration
+   * @return list of CarbonInputSplit
    * @throws IOException
    */
   @Override
@@ -114,7 +109,7 @@
     // check for externalTable segment (Segment_null)
     // process and resolve the expression
 
-    ReadCommittedScope readCommittedScope = null;
+    ReadCommittedScope readCommittedScope;
     if (carbonTable.isTransactionalTable()) {
       readCommittedScope = new LatestFilesReadCommittedScope(
           identifier.getTablePath() + "/Fact/Part0/Segment_null/", job.getConfiguration());
@@ -131,7 +126,7 @@
     IndexFilter filter = getFilterPredicates(job.getConfiguration());
 
     // if external table Segments are found, add it to the List
-    List<Segment> externalTableSegments = new ArrayList<Segment>();
+    List<Segment> externalTableSegments = new ArrayList<>();
     Segment seg;
     if (carbonTable.isTransactionalTable()) {
       // SDK some cases write into the Segment Path instead of Table Path i.e. inside
@@ -145,9 +140,9 @@
       for (LoadMetadataDetails load : loadMetadataDetails) {
         seg = new Segment(load.getLoadName(), null, readCommittedScope);
         if (fileLists != null) {
-          for (int i = 0; i < fileLists.size(); i++) {
+          for (Object fileList : fileLists) {
             String timestamp =
-                CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileLists.get(i).toString());
+                CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileList.toString());
             if (timestamp.equals(seg.getSegmentNo())) {
               externalTableSegments.add(seg);
               break;
@@ -170,7 +165,7 @@
       // do block filtering and get split
       splits = getSplits(job, filter, externalTableSegments);
     } else {
-      List<CarbonFile> carbonFiles = null;
+      List<CarbonFile> carbonFiles;
       if (null != this.fileLists) {
         carbonFiles = getAllCarbonDataFiles(this.fileLists);
       } else {
@@ -191,13 +186,7 @@
         info.setUseMinMaxForPruning(false);
         splits.add(split);
       }
-      Collections.sort(splits, new Comparator<InputSplit>() {
-        @Override
-        public int compare(InputSplit o1, InputSplit o2) {
-          return ((CarbonInputSplit) o1).getFilePath()
-              .compareTo(((CarbonInputSplit) o2).getFilePath());
-        }
-      });
+      splits.sort(Comparator.comparing(o -> ((CarbonInputSplit) o).getFilePath()));
     }
     setAllColumnProjectionIfNotConfigured(job, carbonTable);
     return splits;
@@ -215,12 +204,8 @@
   private List<CarbonFile> getAllCarbonDataFiles(String tablePath) {
     List<CarbonFile> carbonFiles;
     try {
-      carbonFiles = FileFactory.getCarbonFile(tablePath).listFiles(true, new CarbonFileFilter() {
-        @Override
-        public boolean accept(CarbonFile file) {
-          return file.getName().endsWith(CarbonTablePath.CARBON_DATA_EXT);
-        }
-      });
+      carbonFiles = FileFactory.getCarbonFile(tablePath).listFiles(true,
+          file -> file.getName().endsWith(CarbonTablePath.CARBON_DATA_EXT));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -228,10 +213,10 @@
   }
 
   private List<CarbonFile> getAllCarbonDataFiles(List fileLists) {
-    List<CarbonFile> carbonFiles = new LinkedList<CarbonFile>();
+    List<CarbonFile> carbonFiles = new LinkedList<>();
     try {
-      for (int i = 0; i < fileLists.size(); i++) {
-        carbonFiles.add(FileFactory.getCarbonFile(fileLists.get(i).toString()));
+      for (Object fileList : fileLists) {
+        carbonFiles.add(FileFactory.getCarbonFile(fileList.toString()));
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -247,20 +232,13 @@
    * @return
    * @throws IOException
    */
-  private List<InputSplit> getSplits(
-      JobContext job,
-      IndexFilter indexFilter,
+  private List<InputSplit> getSplits(JobContext job, IndexFilter indexFilter,
       List<Segment> validSegments) throws IOException {
-
     numSegments = validSegments.size();
-    List<InputSplit> result = new LinkedList<InputSplit>();
-
     // for each segment fetch blocks matching filter in Driver BTree
-    List<CarbonInputSplit> dataBlocksOfSegment =
-        getDataBlocksOfSegment(job, carbonTable, indexFilter, validSegments,
-            new ArrayList<Segment>(), new ArrayList<String>());
+    List<CarbonInputSplit> dataBlocksOfSegment = getDataBlocksOfSegment(job, carbonTable,
+        indexFilter, validSegments, new ArrayList<>(), new ArrayList<>());
     numBlocks = dataBlocksOfSegment.size();
-    result.addAll(dataBlocksOfSegment);
-    return result;
+    return new LinkedList<>(dataBlocksOfSegment);
   }
 }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 5c9e5e1..130e0d9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -126,7 +126,7 @@
   protected int numSegments = 0;
   protected int numStreamSegments = 0;
   protected int numStreamFiles = 0;
-  protected int hitedStreamFiles = 0;
+  protected int hitStreamFiles = 0;
   protected int numBlocks = 0;
   protected List fileLists = null;
 
@@ -144,8 +144,8 @@
     return numStreamFiles;
   }
 
-  public int getHitedStreamFiles() {
-    return hitedStreamFiles;
+  public int getHitStreamFiles() {
+    return hitStreamFiles;
   }
 
   public int getNumBlocks() {
@@ -259,24 +259,13 @@
     if (projection == null || projection.isEmpty()) {
       return;
     }
-    String[] allColumns = projection.getAllColumns();
-    StringBuilder builder = new StringBuilder();
-    for (String column : allColumns) {
-      builder.append(column).append(",");
-    }
-    String columnString = builder.toString();
-    columnString = columnString.substring(0, columnString.length() - 1);
-    configuration.set(COLUMN_PROJECTION, columnString);
+    setColumnProjection(configuration, projection.getAllColumns());
   }
 
   public static String getColumnProjection(Configuration configuration) {
     return configuration.get(COLUMN_PROJECTION);
   }
 
-  public static void setFgIndexPruning(Configuration configuration, boolean enable) {
-    configuration.set(FG_INDEX_PRUNING, String.valueOf(enable));
-  }
-
   public static boolean isFgIndexPruningEnable(Configuration configuration) {
     String enable = configuration.get(FG_INDEX_PRUNING);
 
@@ -390,12 +379,9 @@
   }
 
   /**
-   * {@inheritDoc}
-   * Configurations FileInputFormat.INPUT_DIR
-   * are used to get table path to read.
-   *
-   * @param job
-   * @return List<InputSplit> list of CarbonInputSplit
+   * get list of block/blocklet and make them to CarbonInputSplit
+   * @param job JobContext with Configuration
+   * @return list of CarbonInputSplit
    * @throws IOException
    */
   @Override
@@ -409,7 +395,7 @@
   Long getDistributedCount(CarbonTable table,
       List<PartitionSpec> partitionNames, List<Segment> validSegments) {
     IndexInputFormat indexInputFormat =
-        new IndexInputFormat(table, null, validSegments, new ArrayList<String>(),
+        new IndexInputFormat(table, null, validSegments, new ArrayList<>(),
             partitionNames, false, null, false, false);
     indexInputFormat.setIsWriteToFile(false);
     try {
@@ -497,7 +483,7 @@
       // matchedPartitions variable will be null in two cases as follows
       // 1. the table is not a partition table
       // 2. the table is a partition table, and all partitions are matched by query
-      // for partition table, the task id of carbaondata file name is the partition id.
+      // for partition table, the task id of carbondata file name is the partition id.
       // if this partition is not required, here will skip it.
       resultFilteredBlocks.add(blocklet.getInputSplit());
     }
@@ -512,11 +498,11 @@
    * get number of block by counting distinct file path of blocklets
    */
   private int getBlockCount(List<ExtendedBlocklet> blocklets) {
-    Set<String> filepaths = new HashSet<>();
+    Set<String> filePaths = new HashSet<>();
     for (ExtendedBlocklet blocklet: blocklets) {
-      filepaths.add(blocklet.getPath());
+      filePaths.add(blocklet.getPath());
     }
-    return filepaths.size();
+    return filePaths.size();
   }
 
   /**
@@ -537,7 +523,7 @@
     List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
     // First prune using default index on driver side.
     TableIndex defaultIndex = IndexStoreManager.getInstance().getDefaultIndex(carbonTable);
-    List<ExtendedBlocklet> prunedBlocklets = null;
+    List<ExtendedBlocklet> prunedBlocklets;
     // This is to log the event, so user will know what is happening by seeing logs.
     LOG.info("Started block pruning ...");
     boolean isDistributedPruningEnabled = CarbonProperties.getInstance()
@@ -573,7 +559,7 @@
 
       IndexChooser chooser = new IndexChooser(getOrCreateCarbonTable(job.getConfiguration()));
 
-      // Get the available CG indexs and prune further.
+      // Get the available CG indexes and prune further.
       IndexExprWrapper cgIndexExprWrapper = chooser.chooseCGIndex(filter.getResolver());
 
       if (cgIndexExprWrapper != null) {
@@ -586,7 +572,7 @@
           if (distributedCG && indexJob != null) {
             cgPrunedBlocklets = IndexUtil
                 .executeIndexJob(carbonTable, filter.getResolver(), indexJob, partitionsToPrune,
-                    segmentIds, invalidSegments, IndexLevel.CG, new ArrayList<String>());
+                    segmentIds, invalidSegments, IndexLevel.CG, new ArrayList<>());
           } else {
             cgPrunedBlocklets = cgIndexExprWrapper.prune(segmentIds, partitionsToPrune);
           }
@@ -620,10 +606,9 @@
           // Prune segments from already pruned blocklets
           IndexUtil.pruneSegments(segmentIds, prunedBlocklets);
           // Prune segments from already pruned blocklets
-          fgPrunedBlocklets = IndexUtil
-              .executeIndexJob(carbonTable, filter.getResolver(), indexJob, partitionsToPrune,
-                  segmentIds, invalidSegments, fgIndexExprWrapper.getIndexLevel(),
-                  new ArrayList<String>());
+          fgPrunedBlocklets = IndexUtil.executeIndexJob(
+              carbonTable, filter.getResolver(), indexJob, partitionsToPrune, segmentIds,
+              invalidSegments, fgIndexExprWrapper.getIndexLevel(), new ArrayList<>());
           // note that the 'fgPrunedBlocklets' has extra index related info compared with
           // 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets'
           prunedBlocklets =
@@ -641,7 +626,7 @@
   private List<ExtendedBlocklet> intersectFilteredBlocklets(CarbonTable carbonTable,
       List<ExtendedBlocklet> previousIndexPrunedBlocklets,
       List<ExtendedBlocklet> otherIndexPrunedBlocklets) {
-    List<ExtendedBlocklet> prunedBlocklets = null;
+    List<ExtendedBlocklet> prunedBlocklets;
     if (BlockletIndexUtil.isCacheLevelBlock(carbonTable)) {
       prunedBlocklets = new ArrayList<>();
       for (ExtendedBlocklet otherBlocklet : otherIndexPrunedBlocklets) {
@@ -705,18 +690,15 @@
         .dataConverter(getDataTypeConverter(configuration))
         .build();
     String readDeltaOnly = configuration.get(READ_ONLY_DELTA);
-    if (readDeltaOnly != null && Boolean.parseBoolean(readDeltaOnly)) {
+    if (Boolean.parseBoolean(readDeltaOnly)) {
       queryModel.setReadOnlyDelta(true);
     }
     return queryModel;
   }
 
   /**
-   * This method will create an Implict Expression and set it as right child in the given
+   * This method will create an Implicit Expression and set it as right child in the given
    * expression
-   *
-   * @param expression
-   * @param inputSplit
    */
   private void checkAndAddImplicitExpression(Expression expression, InputSplit inputSplit) {
     if (inputSplit instanceof CarbonMultiBlockSplit) {
@@ -793,7 +775,7 @@
   /**
    * It is optional, if user does not set then it reads from store
    *
-   * @param configuration
+   * @param configuration hadoop configuration
    * @param converterClass is the Data type converter for different computing engine
    */
   public static void setDataTypeConverter(
@@ -825,11 +807,11 @@
 
   public static String getDatabaseName(Configuration configuration)
       throws InvalidConfigurationException {
-    String databseName = configuration.get(DATABASE_NAME);
-    if (null == databseName) {
+    String databaseName = configuration.get(DATABASE_NAME);
+    if (null == databaseName) {
       throw new InvalidConfigurationException("Database name is not set.");
     }
-    return databseName;
+    return databaseName;
   }
 
   public static void setTableName(Configuration configuration, String tableName) {
@@ -850,8 +832,7 @@
   /**
    * Project all Columns for carbon reader
    *
-   * @return String araay of columnNames
-   * @param carbonTable
+   * @return String array of columnNames
    */
   public String[] projectAllColumns(CarbonTable carbonTable) {
     List<ColumnSchema> colList = carbonTable.getTableInfo().getFactTable().getListOfColumns();
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 6aa3067..4fd754b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -59,7 +59,7 @@
 import org.apache.log4j.Logger;
 
 /**
- * Outputcommitter which manages the segments during loading.It commits segment information to the
+ * OutputCommitter which manages the segments during loading.It commits segment information to the
  * tablestatus file upon success or fail.
  */
 public class CarbonOutputCommitter extends FileOutputCommitter {
@@ -74,10 +74,7 @@
   }
 
   /**
-   * Update the tablestatus with inprogress while setup the job.
-   *
-   * @param context
-   * @throws IOException
+   * Update the tablestatus with in-progress while setup the job.
    */
   @Override
   public void setupJob(JobContext context) throws IOException {
@@ -104,9 +101,6 @@
 
   /**
    * Update the tablestatus as success after job is success
-   *
-   * @param context
-   * @throws IOException
    */
   @Override
   public void commitJob(JobContext context) throws IOException {
@@ -230,7 +224,7 @@
       }
     }
     String updateTime =
-        context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null);
+        context.getConfiguration().get(CarbonTableOutputFormat.UPDATE_TIMESTAMP, null);
     String segmentsToBeDeleted =
         context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, "");
     List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
@@ -327,10 +321,6 @@
   /**
    * Overwrite the partitions in case of overwrite query. It just updates the partition map files
    * of all segment files.
-   *
-   * @param loadModel
-   * @return
-   * @throws IOException
    */
   private String overwritePartitions(CarbonLoadModel loadModel, LoadMetadataDetails newMetaEntry,
       String uuid) throws IOException {
@@ -345,19 +335,18 @@
           new SegmentStatusManager(table.getAbsoluteTableIdentifier())
               .getValidAndInvalidSegments(table.isMV()).getValidSegments();
       String uniqueId = String.valueOf(System.currentTimeMillis());
-      List<String> tobeUpdatedSegs = new ArrayList<>();
-      List<String> tobeDeletedSegs = new ArrayList<>();
+      List<String> toBeUpdatedSegments = new ArrayList<>();
+      List<String> toBeDeletedSegments = new ArrayList<>();
       // First drop the partitions from partition mapper files of each segment
       for (Segment segment : validSegments) {
-        new SegmentFileStore(table.getTablePath(), segment.getSegmentFileName())
-            .dropPartitions(segment, partitionSpecs, uniqueId, tobeDeletedSegs, tobeUpdatedSegs);
-
+        new SegmentFileStore(table.getTablePath(), segment.getSegmentFileName()).dropPartitions(
+            segment, partitionSpecs, uniqueId, toBeDeletedSegments, toBeUpdatedSegments);
       }
       newMetaEntry.setUpdateStatusFileName(uniqueId);
       // Commit the removed partitions in carbon store.
       CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid,
-          Segment.toSegmentList(tobeDeletedSegs, null),
-          Segment.toSegmentList(tobeUpdatedSegs, null));
+          Segment.toSegmentList(toBeDeletedSegments, null),
+          Segment.toSegmentList(toBeUpdatedSegments, null));
       return uniqueId;
     }
     return null;
@@ -375,10 +364,6 @@
   /**
    * Update the tablestatus as fail if any fail happens.And also clean up the temp folders if any
    * are existed.
-   *
-   * @param context
-   * @param state
-   * @throws IOException
    */
   @Override
   public void abortJob(JobContext context, JobStatus.State state) throws IOException {
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 2d06222..bd0f5d1 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -101,12 +101,9 @@
   private ReadCommittedScope readCommittedScope;
 
   /**
-   * {@inheritDoc}
-   * Configurations FileInputFormat.INPUT_DIR
-   * are used to get table path to read.
-   *
-   * @param job
-   * @return List<InputSplit> list of CarbonInputSplit
+   * get list of block/blocklet and make them to CarbonInputSplit
+   * @param job JobContext with Configuration
+   * @return list of CarbonInputSplit
    * @throws IOException
    */
   @Override
@@ -218,16 +215,6 @@
   }
 
   /**
-   * Method to check and refresh segment cache
-   *
-   * @param job
-   * @param carbonTable
-   * @param updateStatusManager
-   * @param filteredSegmentToAccess
-   * @throws IOException
-   */
-
-  /**
    * Return segment list after filtering out valid segments and segments set by user by
    * `INPUT_SEGMENT_NUMBERS` in job configuration
    */
@@ -276,7 +263,7 @@
    */
   public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,
       CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) throws IOException {
-    List<InputSplit> splits = new ArrayList<InputSplit>();
+    List<InputSplit> splits = new ArrayList<>();
     if (streamSegments != null && !streamSegments.isEmpty()) {
       numStreamSegments = streamSegments.size();
       long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
@@ -294,7 +281,7 @@
       streamPruner.init(filterResolverIntf);
       List<StreamFile> streamFiles = streamPruner.prune(streamSegments);
       // record the hit information of the streaming files
-      this.hitedStreamFiles = streamFiles.size();
+      this.hitStreamFiles = streamFiles.size();
       this.numStreamFiles = streamPruner.getTotalFileNums();
       for (StreamFile streamFile : streamFiles) {
         Path path = new Path(streamFile.getFilePath());
@@ -358,7 +345,7 @@
     }
 
     numSegments = validSegments.size();
-    List<InputSplit> result = new LinkedList<InputSplit>();
+    List<InputSplit> result = new LinkedList<>();
     UpdateVO invalidBlockVOForSegmentId = null;
     boolean isIUDTable;
 
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index ebac3d4..f52030c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -65,7 +65,7 @@
  * creates new segment folder and manages the folder through tablestatus file.
  * It also generate and writes dictionary data during load only if dictionary server is configured.
  */
-// TODO Move dictionary generater which is coded in spark to MR framework.
+// TODO Move dictionary generator which is coded in spark to MR framework.
 public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, ObjectArrayWritable> {
 
   protected static final String LOAD_MODEL = "mapreduce.carbontable.load.model";
@@ -98,7 +98,7 @@
    * Set the update timestamp if user sets in case of update query. It needs to be updated
    * in load status update time
    */
-  public static final String UPADTE_TIMESTAMP = "mapreduce.carbontable.update.timestamp";
+  public static final String UPDATE_TIMESTAMP = "mapreduce.carbontable.update.timestamp";
 
   /**
    * During update query we first delete the old data and then add updated data to new segment, so
@@ -108,11 +108,6 @@
   public static final String SEGMENTS_TO_BE_DELETED =
       "mapreduce.carbontable.segments.to.be.removed";
 
-  /**
-   * It is used only to fire events in case of any child tables to be loaded.
-   */
-  public static final String OPERATION_CONTEXT = "mapreduce.carbontable.operation.context";
-
   private static final Logger LOG =
       LogServiceFactory.getLogService(CarbonTableOutputFormat.class.getName());
 
@@ -269,29 +264,25 @@
         new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName(),
                 true));
     // It should be started in new thread as the underlying iterator uses blocking queue.
-    Future future = executorService.submit(new Thread() {
-      @Override
-      public void run() {
-        ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo()
-            .put("carbonConf", taskAttemptContext.getConfiguration());
-        try {
-          dataLoadExecutor
-              .execute(loadModel, tempStoreLocations, iterators);
-        } catch (Exception e) {
-          executorService.shutdownNow();
-          for (CarbonOutputIteratorWrapper iterator : iterators) {
-            iterator.closeWriter(true);
-          }
-          try {
-            dataLoadExecutor.close();
-          } catch (Exception ex) {
-            // As already exception happened before close() send that exception.
-            throw new RuntimeException(e);
-          }
-          throw new RuntimeException(e);
-        } finally {
-          ThreadLocalSessionInfo.unsetAll();
+    Future future = executorService.submit(() -> {
+      ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo()
+          .put("carbonConf", taskAttemptContext.getConfiguration());
+      try {
+        dataLoadExecutor.execute(loadModel, tempStoreLocations, iterators);
+      } catch (Exception e) {
+        executorService.shutdownNow();
+        for (CarbonOutputIteratorWrapper iterator : iterators) {
+          iterator.closeWriter(true);
         }
+        try {
+          dataLoadExecutor.close();
+        } catch (Exception ex) {
+          // As already exception happened before close() send that exception.
+          throw new RuntimeException(e);
+        }
+        throw new RuntimeException(e);
+      } finally {
+        ThreadLocalSessionInfo.unsetAll();
       }
     });
 
@@ -433,15 +424,15 @@
 
   public static class CarbonRecordWriter extends RecordWriter<NullWritable, ObjectArrayWritable> {
 
-    private CarbonOutputIteratorWrapper iteratorWrapper;
+    private final CarbonOutputIteratorWrapper iteratorWrapper;
 
-    private DataLoadExecutor dataLoadExecutor;
+    private final DataLoadExecutor dataLoadExecutor;
 
-    private CarbonLoadModel loadModel;
+    private final CarbonLoadModel loadModel;
 
-    private ExecutorService executorService;
+    private final ExecutorService executorService;
 
-    private Future future;
+    private final Future future;
 
     private boolean isClosed;
 
@@ -526,11 +517,11 @@
   and handles the load balancing of the write rows in round robin. */
   public static class CarbonMultiRecordWriter extends CarbonRecordWriter {
 
-    private CarbonOutputIteratorWrapper[] iterators;
+    private final CarbonOutputIteratorWrapper[] iterators;
 
     // keep counts of number of writes called
     // and it is used to load balance each write call to one iterator.
-    private AtomicLong counter;
+    private final AtomicLong counter;
 
     CarbonMultiRecordWriter(CarbonOutputIteratorWrapper[] iterators,
         DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future,
@@ -551,9 +542,9 @@
 
     @Override
     public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException {
-      for (int i = 0; i < iterators.length; i++) {
-        synchronized (iterators[i]) {
-          iterators[i].closeWriter(false);
+      for (CarbonOutputIteratorWrapper iterator : iterators) {
+        synchronized (iterator) {
+          iterator.closeWriter(false);
         }
       }
       super.close(taskAttemptContext);
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java
index df37e09..2a6e54f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java
@@ -47,7 +47,7 @@
   public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
   public static final String STREAM_RECORD_READER_INSTANCE =
       "org.apache.carbondata.stream.CarbonStreamRecordReader";
-  // return raw row for handoff
+  // return raw row for hand off
   private boolean useRawRow = false;
 
   public void setUseRawRow(boolean useRawRow) {
@@ -114,7 +114,7 @@
     for (int i = 0; i < dimension.getNumberOfChild(); i++) {
       CarbonDimension child = dimension.getListOfChildDimensions().get(i);
       DataType dataType = child.getDataType();
-      GenericQueryType queryType = null;
+      GenericQueryType queryType;
       if (DataTypes.isArrayType(dataType)) {
         queryType =
             new ArrayQueryType(child.getColName(), dimension.getColName(), ++parentColumnIndex);
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java
index c060535..29170fa 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java
@@ -33,9 +33,9 @@
 
   }
 
-  public static Object getInstanceWithReflection(Constructor cons, Object... initargs) throws
+  public static Object getInstanceWithReflection(Constructor cons, Object... initArgs) throws
           IllegalAccessException,
           InvocationTargetException, InstantiationException {
-    return cons.newInstance(initargs);
+    return cons.newInstance(initArgs);
   }
 }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java
index be8bc5a..6d91e78 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java
@@ -41,10 +41,10 @@
   private final long limitStart;
   private final long limitEnd;
   private boolean isAlreadySync = false;
-  private Compressor compressor;
+  private final Compressor compressor;
   private int rowNums = 0;
   private int rowIndex = 0;
-  private boolean isHeaderPresent;
+  private final boolean isHeaderPresent;
 
   public StreamBlockletReader(byte[] syncMarker, InputStream in, long limit,
       boolean isHeaderPresent, String compressorName) {
@@ -175,7 +175,7 @@
     int ch4 = in.read();
     if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException();
     pos += 4;
-    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4);
   }
 
   /**
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
index 52b10dc..14637ee 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
@@ -113,8 +113,8 @@
   // empty project, null filter
   protected boolean skipScanData;
 
-  // return raw row for handoff
-  private boolean useRawRow = false;
+  // return raw row for hand off
+  private final boolean useRawRow;
 
   public StreamRecordReader(QueryModel mdl, boolean useRawRow) {
     this.model = mdl;
@@ -137,7 +137,7 @@
     // metadata
     hadoopConf = context.getConfiguration();
     if (model == null) {
-      CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+      CarbonTableInputFormat<Object> format = new CarbonTableInputFormat<>();
       model = format.createQueryModel(split, context);
     }
     carbonTable = model.getTable();
@@ -169,19 +169,19 @@
     projection = model.getProjectionColumns();
 
     isRequired = new boolean[storageColumns.length];
-    boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
-    boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
+    boolean[] isFilterDimensions = model.getIsFilterDimensions();
+    boolean[] isFilterMeasures = model.getIsFilterMeasures();
     isFilterRequired = new boolean[storageColumns.length];
     filterMap = new int[storageColumns.length];
     for (int i = 0; i < storageColumns.length; i++) {
       if (storageColumns[i].isDimension()) {
-        if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
+        if (isFilterDimensions[storageColumns[i].getOrdinal()]) {
           isRequired[i] = true;
           isFilterRequired[i] = true;
           filterMap[i] = storageColumns[i].getOrdinal();
         }
       } else {
-        if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
+        if (isFilterMeasures[storageColumns[i].getOrdinal()]) {
           isRequired[i] = true;
           isFilterRequired[i] = true;
           filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal();
@@ -279,7 +279,7 @@
             scanMore = false;
           } else {
             if (useRawRow) {
-              // read raw row for streaming handoff which does not require decode raw row
+              // read raw row for streaming hand off which does not require decode raw row
               readRawRowFromStream();
             } else {
               readRowFromStream();
@@ -346,11 +346,7 @@
         BitSet bitSet = filter
             .isScanRequired(minMaxIndex.getMaxValues(), minMaxIndex.getMinValues(),
                 minMaxIndex.getIsMinMaxSet());
-        if (bitSet.isEmpty()) {
-          return false;
-        } else {
-          return true;
-        }
+        return !bitSet.isEmpty();
       }
     }
     return true;
@@ -375,7 +371,7 @@
         }
       } else {
         if (isNoDictColumn[colCount]) {
-          int v = 0;
+          int v;
           if (dimensionsIsVarcharTypeMap[colCount]) {
             v = input.readInt();
           } else {
@@ -554,7 +550,7 @@
         outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
       } else {
         if (isNoDictColumn[colCount]) {
-          int v = 0;
+          int v;
           if (dimensionsIsVarcharTypeMap[colCount]) {
             v = input.readInt();
           } else {
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index 2abfe7c..7c1d04f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -43,10 +43,10 @@
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
-import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
@@ -86,9 +86,9 @@
 
   private static final Logger LOG =
       LogServiceFactory.getLogService(StoreCreator.class.getCanonicalName());
-  private AbsoluteTableIdentifier absoluteTableIdentifier;
-  private String storePath = null;
-  private String csvPath;
+  private final AbsoluteTableIdentifier absoluteTableIdentifier;
+  private final String storePath;
+  private final String csvPath;
   private List<String> sortColumns = new ArrayList<>();
 
   public StoreCreator(String storePath, String csvPath) {
@@ -124,7 +124,7 @@
     loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
     loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
     loadModel.setFactFilePath(factFilePath);
-    loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
+    loadModel.setLoadMetadataDetails(new ArrayList<>());
     loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
     loadModel.setDateFormat(null);
     loadModel.setCarbonTransactionalTable(table.isTransactionalTable());
@@ -198,103 +198,22 @@
     tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
     TableSchema tableSchema = new TableSchema();
     tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
-    List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
+    List<ColumnSchema> columnSchemas = new ArrayList<>();
     ArrayList<Encoding> encodings = new ArrayList<>();
     int schemaOrdinal = 0;
-    ColumnSchema id = new ColumnSchema();
-    id.setColumnName("id");
-    id.setDataType(DataTypes.INT);
-    id.setEncodingList(encodings);
-    id.setColumnUniqueId(UUID.randomUUID().toString());
-    id.setColumnReferenceId(id.getColumnUniqueId());
-    id.setDimensionColumn(true);
-    id.setSchemaOrdinal(schemaOrdinal++);
-    if (sortColumns.contains(id.getColumnName())) {
-      id.setSortColumn(true);
-    }
-    columnSchemas.add(id);
-
-    ColumnSchema date = new ColumnSchema();
-    date.setColumnName("date");
-    date.setDataType(DataTypes.STRING);
-    date.setEncodingList(encodings);
-    date.setColumnUniqueId(UUID.randomUUID().toString());
-    date.setDimensionColumn(true);
-    date.setColumnReferenceId(date.getColumnUniqueId());
-    date.setSchemaOrdinal(schemaOrdinal++);
-    if (sortColumns.contains(date.getColumnName())) {
-      date.setSortColumn(true);
-    }
-    columnSchemas.add(date);
-
-    ColumnSchema country = new ColumnSchema();
-    country.setColumnName("country");
-    country.setDataType(DataTypes.STRING);
-    country.setEncodingList(encodings);
-    country.setColumnUniqueId(UUID.randomUUID().toString());
-    country.setDimensionColumn(true);
-    country.setSortColumn(true);
-    country.setSchemaOrdinal(schemaOrdinal++);
-    if (sortColumns.contains(country.getColumnName())) {
-      country.setSortColumn(true);
-    }
-    country.setColumnReferenceId(country.getColumnUniqueId());
-    columnSchemas.add(country);
-
-    ColumnSchema name = new ColumnSchema();
-    name.setColumnName("name");
-    name.setDataType(DataTypes.STRING);
-    name.setEncodingList(encodings);
-    name.setColumnUniqueId(UUID.randomUUID().toString());
-    name.setDimensionColumn(true);
-    name.setSchemaOrdinal(schemaOrdinal++);
-    if (sortColumns.contains(name.getColumnName())) {
-      name.setSortColumn(true);
-    }
-    name.setColumnReferenceId(name.getColumnUniqueId());
-    columnSchemas.add(name);
-
-    ColumnSchema phonetype = new ColumnSchema();
-    phonetype.setColumnName("phonetype");
-    phonetype.setDataType(DataTypes.STRING);
-    phonetype.setEncodingList(encodings);
-    phonetype.setColumnUniqueId(UUID.randomUUID().toString());
-    phonetype.setDimensionColumn(true);
-    phonetype.setSchemaOrdinal(schemaOrdinal++);
-    if (sortColumns.contains(phonetype.getColumnName())) {
-      phonetype.setSortColumn(true);
-    }
-    phonetype.setColumnReferenceId(phonetype.getColumnUniqueId());
-    columnSchemas.add(phonetype);
-
-    ColumnSchema serialname = new ColumnSchema();
-    serialname.setColumnName("serialname");
-    serialname.setDataType(DataTypes.STRING);
-    serialname.setEncodingList(encodings);
-    serialname.setColumnUniqueId(UUID.randomUUID().toString());
-    serialname.setDimensionColumn(true);
-    serialname.setSchemaOrdinal(schemaOrdinal++);
-    if (sortColumns.contains(serialname.getColumnName())) {
-      serialname.setSortColumn(true);
-    }
-    serialname.setColumnReferenceId(serialname.getColumnUniqueId());
-    columnSchemas.add(serialname);
-    ColumnSchema salary = new ColumnSchema();
-    salary.setColumnName("salary");
-    salary.setDataType(DataTypes.INT);
-    salary.setEncodingList(new ArrayList<Encoding>());
-    salary.setColumnUniqueId(UUID.randomUUID().toString());
-    salary.setDimensionColumn(false);
-    salary.setColumnReferenceId(salary.getColumnUniqueId());
-    salary.setSchemaOrdinal(schemaOrdinal++);
-    columnSchemas.add(salary);
-
+    addColumn(columnSchemas, encodings, schemaOrdinal++, "id", DataTypes.INT, true);
+    addColumn(columnSchemas, encodings, schemaOrdinal++, "date", DataTypes.STRING, true);
+    addColumn(columnSchemas, encodings, schemaOrdinal++, "country", DataTypes.STRING, true);
+    addColumn(columnSchemas, encodings, schemaOrdinal++, "name", DataTypes.STRING, true);
+    addColumn(columnSchemas, encodings, schemaOrdinal++, "phonetype", DataTypes.STRING, true);
+    addColumn(columnSchemas, encodings, schemaOrdinal++, "serialname", DataTypes.STRING, true);
+    addColumn(columnSchemas, encodings, schemaOrdinal, "salary", DataTypes.INT, false);
     // rearrange the column schema based on the sort order, if sort columns exists
     List<ColumnSchema> columnSchemas1 = reArrangeColumnSchema(columnSchemas);
     tableSchema.setListOfColumns(columnSchemas1);
-    SchemaEvolution schemaEvol = new SchemaEvolution();
-    schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>());
-    tableSchema.setSchemaEvolution(schemaEvol);
+    SchemaEvolution schemaEvolution = new SchemaEvolution();
+    schemaEvolution.setSchemaEvolutionEntryList(new ArrayList<>());
+    tableSchema.setSchemaEvolution(schemaEvolution);
     tableSchema.setTableId(UUID.randomUUID().toString());
     tableInfo.setTableUniqueName(
         identifier.getCarbonTableIdentifier().getTableUniqueName()
@@ -328,6 +247,22 @@
     return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName());
   }
 
+  private void addColumn(List<ColumnSchema> columnSchemas, ArrayList<Encoding> encodings,
+      int schemaOrdinal, String name2, DataType dataType, boolean isDimensionColumn) {
+    ColumnSchema name = new ColumnSchema();
+    name.setColumnName(name2);
+    name.setDataType(dataType);
+    name.setEncodingList(encodings);
+    name.setColumnUniqueId(UUID.randomUUID().toString());
+    name.setColumnReferenceId(name.getColumnUniqueId());
+    name.setDimensionColumn(isDimensionColumn);
+    name.setSchemaOrdinal(schemaOrdinal);
+    if (sortColumns.contains(name.getColumnName())) {
+      name.setSortColumn(true);
+    }
+    columnSchemas.add(name);
+  }
+
   private List<ColumnSchema> reArrangeColumnSchema(List<ColumnSchema> columnSchemas) {
     List<ColumnSchema> newColumnSchema = new ArrayList<>(columnSchemas.size());
     // add sort columns first
@@ -357,10 +292,6 @@
 
   /**
    * Execute graph which will further load data
-   *
-   * @param loadModel
-   * @param storeLocation
-   * @throws Exception
    */
   public static void loadData(CarbonLoadModel loadModel, String storeLocation)
       throws Exception {
@@ -425,7 +356,7 @@
 
     writeLoadMetadata(
         loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
-        new ArrayList<LoadMetadataDetails>());
+        new ArrayList<>());
   }
 
   public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
@@ -460,13 +391,8 @@
       writeOperation.setFailed();
       throw ioe;
     } finally {
-      try {
-        if (null != brWriter) {
-          brWriter.flush();
-        }
-      } catch (Exception e) {
-        throw e;
-
+      if (null != brWriter) {
+        brWriter.flush();
       }
       CarbonUtil.closeStreams(brWriter);
 
@@ -477,11 +403,7 @@
 
   public static String readCurrentTime() {
     SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
-    String date = null;
-
-    date = sdf.format(new Date());
-
-    return date;
+    return sdf.format(new Date());
   }
 
   public static void main(String[] args) throws Exception {
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 9916a18..da47981 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -21,11 +21,8 @@
 import java.text.SimpleDateFormat;
 import java.util.Locale;
 
-import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.index.IndexUtil;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 
 import org.apache.hadoop.conf.Configuration;
@@ -33,31 +30,11 @@
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.log4j.Logger;
 
 /**
  * Utility class
  */
 public class CarbonInputFormatUtil {
-
-  /**
-   * Attribute for Carbon LOGGER.
-   */
-  private static final Logger LOGGER =
-      LogServiceFactory.getLogService(CarbonProperties.class.getName());
-
-  public static <V> CarbonFileInputFormat<V> createCarbonFileInputFormat(
-      AbsoluteTableIdentifier identifier, Job job) throws IOException {
-    CarbonFileInputFormat<V> carbonInputFormat = new CarbonFileInputFormat<V>();
-    CarbonTableInputFormat.setDatabaseName(job.getConfiguration(),
-        identifier.getCarbonTableIdentifier().getDatabaseName());
-    CarbonTableInputFormat
-        .setTableName(job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
-    FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
-    setIndexJobIfConfigured(job.getConfiguration());
-    return carbonInputFormat;
-  }
-
   public static <V> CarbonTableInputFormat<V> createCarbonInputFormat(
       AbsoluteTableIdentifier identifier,
       Job job) throws IOException {
@@ -73,9 +50,6 @@
 
   /**
    * This method set IndexJob if configured
-   *
-   * @param conf
-   * @throws IOException
    */
   public static void setIndexJobIfConfigured(Configuration conf) throws IOException {
     String className = "org.apache.carbondata.indexserver.EmbeddedIndexJob";
@@ -87,8 +61,7 @@
   }
 
   public static JobID getJobId(java.util.Date date, int batch) {
-    String jobtrackerID = createJobTrackerID(date);
-    return new JobID(jobtrackerID, batch);
+    String jobTrackerID = createJobTrackerID(date);
+    return new JobID(jobTrackerID, batch);
   }
-
 }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java
index 72a8d09..d21a25a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java
@@ -51,7 +51,7 @@
 
   @Override
   public String[] getLocations() {
-    Set<String> locations = new HashSet<String>();
+    Set<String> locations = new HashSet<>();
     for (CarbonInputSplit splitInfo : carbonBlockInfoList) {
       try {
         locations.addAll(Arrays.asList(splitInfo.getLocations()));
@@ -90,55 +90,47 @@
 
   /**
    * Finding which node has the maximum number of blocks for it.
-   *
-   * @param splitList
-   * @return
    */
   public static List<String> maxNoNodes(List<CarbonInputSplit> splitList) {
     boolean useIndex = true;
-    Integer maxOccurence = 0;
+    Integer maxOccurrence = 0;
     String maxNode = null;
-    Map<String, Integer> nodeAndOccurenceMapping = new TreeMap<>();
+    Map<String, Integer> nodeAndOccurrenceMapping = new TreeMap<>();
 
-    // populate the map of node and number of occurences of that node.
+    // populate the map of node and number of occurrences of that node.
     for (CarbonInputSplit split : splitList) {
       try {
         for (String node : split.getLocations()) {
-          Integer nodeOccurence = nodeAndOccurenceMapping.get(node);
-          if (null == nodeOccurence) {
-            nodeAndOccurenceMapping.put(node, 1);
-          } else {
-            nodeOccurence++;
-          }
+          nodeAndOccurrenceMapping.putIfAbsent(node, 1);
         }
       } catch (IOException e) {
         throw new RuntimeException("Fail to get location of split: " + split, e);
       }
     }
-    Integer previousValueOccurence = null;
+    Integer previousValueOccurrence = null;
 
-    // check which node is occured maximum times.
-    for (Map.Entry<String, Integer> entry : nodeAndOccurenceMapping.entrySet()) {
+    // check which node is occurred maximum times.
+    for (Map.Entry<String, Integer> entry : nodeAndOccurrenceMapping.entrySet()) {
       // finding the maximum node.
-      if (entry.getValue() > maxOccurence) {
-        maxOccurence = entry.getValue();
+      if (entry.getValue() > maxOccurrence) {
+        maxOccurrence = entry.getValue();
         maxNode = entry.getKey();
       }
-      // first time scenario. initialzing the previous value.
-      if (null == previousValueOccurence) {
-        previousValueOccurence = entry.getValue();
+      // first time scenario. initializing the previous value.
+      if (null == previousValueOccurrence) {
+        previousValueOccurrence = entry.getValue();
       } else {
         // for the case where all the nodes have same number of blocks then
         // we need to return complete list instead of max node.
-        if (!Objects.equals(previousValueOccurence, entry.getValue())) {
+        if (!Objects.equals(previousValueOccurrence, entry.getValue())) {
           useIndex = false;
         }
       }
     }
 
-    // if all the nodes have equal occurence then returning the complete key set.
+    // if all the nodes have equal occurrence then returning the complete key set.
     if (useIndex) {
-      return new ArrayList<>(nodeAndOccurenceMapping.keySet());
+      return new ArrayList<>(nodeAndOccurrenceMapping.keySet());
     }
 
     // if any max node is found then returning the max node.
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
index 54f4bf7..491094d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
@@ -69,10 +69,10 @@
 
   private AbstractDetailQueryResultIterator iterator;
 
-  private QueryModel queryModel;
+  private final QueryModel queryModel;
   //This holds mapping of  fetch index with respect to project col index.
   // it is used when same col is used in projection many times.So need to fetch only that col.
-  private List<Integer> projectionMapping = new ArrayList<>();
+  private final List<Integer> projectionMapping = new ArrayList<>();
 
   public CarbonVectorizedRecordReader(QueryModel queryModel) {
     this.queryModel = queryModel;
@@ -171,16 +171,16 @@
       }
       CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
 
-      Map<String, Integer> colmap = new HashMap<>();
+      Map<String, Integer> columnMap = new HashMap<>();
       for (int i = 0; i < fields.length; i++) {
         vectors[i] = new CarbonColumnVectorImpl(
                 CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
                 fields[i].getDataType());
-        if (colmap.containsKey(fields[i].getFieldName())) {
-          int reusedIndex = colmap.get(fields[i].getFieldName());
+        if (columnMap.containsKey(fields[i].getFieldName())) {
+          int reusedIndex = columnMap.get(fields[i].getFieldName());
           projectionMapping.add(reusedIndex);
         } else {
-          colmap.put(fields[i].getFieldName(), i);
+          columnMap.put(fields[i].getFieldName(), i);
           projectionMapping.add(i);
         }
       }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 5424b66..4d1f3a3 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -193,7 +193,7 @@
         logInfo(
           s"""
              | Identified no.of.streaming splits/tasks: ${ streamPartitions.size },
-             | no.of.streaming files: ${format.getHitedStreamFiles},
+             | no.of.streaming files: ${format.getHitStreamFiles},
              | no.of.total streaming files: ${format.getNumStreamFiles},
              | no.of.total streaming segement: ${format.getNumStreamSegments}
           """.stripMargin)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index d9c2633..e25c6e1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -174,7 +174,7 @@
     // in load status update time
     val updateTimeStamp = options.get("updatetimestamp")
     if (updateTimeStamp.isDefined) {
-      conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
+      conf.set(CarbonTableOutputFormat.UPDATE_TIMESTAMP, updateTimeStamp.get)
     }
     conf.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
     CarbonTableOutputFormat.setLoadModel(conf, model)